From bdfb88538dd3454ff1e523eacf5f4f0a3728960f Mon Sep 17 00:00:00 2001 From: Vladimir Orany Date: Thu, 24 Oct 2024 11:37:48 +0200 Subject: [PATCH] batch send sqs messages (#269) * low-level method to send messages in batch * batch publish in declarative clients --- docs/guide/src/docs/asciidoc/sqs.adoc | 4 +- .../micronaut-amazon-awssdk-sqs.gradle | 1 + .../awssdk/sqs/DefaultSimpleQueueService.java | 60 +++++++++++-- .../awssdk/sqs/QueueClientIntroduction.java | 32 ++++++- .../amazon/awssdk/sqs/SimpleQueueService.java | 84 +++++++++++++++++++ .../groovy/SimpleQueueServiceExtensions.java | 38 +++++++++ .../amazon/awssdk/sqs/DefaultClient.java | 7 +- .../amazon/awssdk/sqs/QueueClientSpec.groovy | 25 ++++++ .../awssdk/sqs/SimpleQueueServiceSpec.groovy | 16 ++++ 9 files changed, 255 insertions(+), 12 deletions(-) diff --git a/docs/guide/src/docs/asciidoc/sqs.adoc b/docs/guide/src/docs/asciidoc/sqs.adoc index 715843d37..88b2523e0 100644 --- a/docs/guide/src/docs/asciidoc/sqs.adoc +++ b/docs/guide/src/docs/asciidoc/sqs.adoc @@ -92,7 +92,9 @@ include::{root-dir}/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/ <6> You can publish a string with custom delay <7> You can publish a string with custom FIFO queue group <8> You can publish a string with custom delay and FIFO queue group -<9> You can delete published message using the message ID if the +<9> You can send multiple messages at once when the argument is `Publisher` +<10> If the return type is also publisher type then **you need to subscribe to the publisher to actually send the messages** +<11> You can delete published message using the message ID if the [source,java,indent=0,options="nowrap",role="secondary"] .Publishing String Records (AWS SDK 1.x) diff --git a/subprojects/micronaut-amazon-awssdk-sqs/micronaut-amazon-awssdk-sqs.gradle b/subprojects/micronaut-amazon-awssdk-sqs/micronaut-amazon-awssdk-sqs.gradle index ac201b5e8..c0c0c7577 100644 --- a/subprojects/micronaut-amazon-awssdk-sqs/micronaut-amazon-awssdk-sqs.gradle +++ b/subprojects/micronaut-amazon-awssdk-sqs/micronaut-amazon-awssdk-sqs.gradle @@ -23,6 +23,7 @@ dependencies { implementation "space.jasan:groovy-closure-support:$closureSupportVersion" implementation 'io.micronaut.validation:micronaut-validation' implementation 'io.micronaut:micronaut-jackson-databind' + implementation 'io.micronaut.reactor:micronaut-reactor' testImplementation project(':micronaut-amazon-awssdk-integration-testing') } diff --git a/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/DefaultSimpleQueueService.java b/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/DefaultSimpleQueueService.java index b51d70633..ecd1b4756 100644 --- a/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/DefaultSimpleQueueService.java +++ b/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/DefaultSimpleQueueService.java @@ -18,8 +18,10 @@ package com.agorapulse.micronaut.amazon.awssdk.sqs; import io.micronaut.core.util.StringUtils; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.sqs.SqsClient; @@ -278,6 +280,54 @@ public String sendMessage(String queueName, String messageBody, int delaySeconds return messageId; } + @Override + public Publisher sendMessages(String queueName, Publisher messageBodies, int delaySeconds, String groupId) { + String queueUrl = getQueueUrl(queueName); + + return Flux.from(messageBodies).map(messageBody -> { + SendMessageBatchRequestEntry.Builder request = SendMessageBatchRequestEntry + .builder() + .id(UUID.randomUUID().toString()) + .messageBody(messageBody); + + if (delaySeconds > 0) { + request.delaySeconds(delaySeconds); + } + + if (StringUtils.isNotEmpty(groupId)) { + request.messageGroupId(groupId); + } + + return request.build(); + }).buffer(10).map(batch -> { + SendMessageBatchRequest.Builder request = SendMessageBatchRequest.builder().queueUrl(queueUrl).entries(batch); + SendMessageBatchResponse response = client.sendMessageBatch(request.build()); + return response.successful().stream().map(SendMessageBatchResultEntry::messageId).toList(); + }).flatMap(Flux::fromIterable); + } + + @Override + public Publisher sendMessages(String queueName, Publisher messageBodies, Consumer messageConfiguration) { + String queueUrl = getQueueUrl(queueName); + + return Flux.from(messageBodies).map(messageBody -> { + SendMessageBatchRequestEntry.Builder request = SendMessageBatchRequestEntry.builder().messageBody(messageBody); + messageConfiguration.accept(request); + return request.build(); + }).buffer(10).flatMap(batch -> Flux.>generate(synchronousSink -> { + SendMessageBatchRequest.Builder request = SendMessageBatchRequest.builder().queueUrl(queueUrl).entries(batch); + SendMessageBatchResponse response = client.sendMessageBatch(request.build()); + if (response.failed().isEmpty()) { + synchronousSink.next(response.successful().stream().map(SendMessageBatchResultEntry::messageId).toList()); + synchronousSink.complete(); + } else { + synchronousSink.error(new IllegalArgumentException("Following messages were not sent:\n" + response.failed().stream().map(e -> + String.format("Message %s failed with code %s and message %s%n", e.id(), e.code(), e.message()) + ).toList())); + } + })).flatMap(Flux::fromIterable); + } + /** * @param queueName * @param messageBody @@ -319,9 +369,7 @@ private void addQueue(String queueUrl) { throw new IllegalStateException("Queue URL cannot be null or empty"); } - synchronized (queueUrlByNames) { - queueUrlByNames.put(getQueueNameFromUrl(queueUrl), queueUrl); - } + queueUrlByNames.put(getQueueNameFromUrl(queueUrl), queueUrl); } private void loadQueues() { @@ -340,10 +388,8 @@ private void loadQueues() { ) ); - synchronized (queueUrlByNames) { queueUrlByNames.clear(); queueUrlByNames.putAll(queueUrls); - } } private void removeQueue(String queueUrl) { @@ -351,9 +397,7 @@ private void removeQueue(String queueUrl) { throw new IllegalStateException("Queue URL cannot be null or empty"); } - synchronized (queueUrlByNames) { - queueUrlByNames.remove(getQueueNameFromUrl(queueUrl)); - } + queueUrlByNames.remove(getQueueNameFromUrl(queueUrl)); } } diff --git a/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/QueueClientIntroduction.java b/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/QueueClientIntroduction.java index e5048e653..f2ccb6bae 100644 --- a/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/QueueClientIntroduction.java +++ b/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/QueueClientIntroduction.java @@ -28,9 +28,12 @@ import io.micronaut.context.BeanContext; import io.micronaut.context.annotation.Requires; import io.micronaut.core.annotation.AnnotationValue; +import io.micronaut.core.async.publisher.Publishers; import io.micronaut.core.type.Argument; import io.micronaut.core.util.StringUtils; import io.micronaut.inject.qualifiers.Qualifiers; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException; @@ -108,7 +111,7 @@ public Object intercept(MethodInvocationContext context) { } private Object doIntercept(MethodInvocationContext context, SimpleQueueService service, String queueName, String group, Integer delay) { - Argument[] arguments = context.getArguments(); + Argument[] arguments = context.getArguments(); Map params = context.getParameterValueMap(); if (arguments.length == 1 && context.getMethodName().startsWith("delete")) { @@ -141,6 +144,27 @@ private Object doIntercept(MethodInvocationContext context, Simp return service.sendMessage(queueName, new String((byte[]) message), delay, group); } + if (Publisher.class.isAssignableFrom(messageType)) { + Publisher messageIdsPublisher; + + if (queueArguments.message.getTypeParameters()[0].equalsType(Argument.STRING)) { + messageIdsPublisher = service.sendMessages(queueName, (Publisher) message, delay, group); + } else { + messageIdsPublisher = service.sendMessages(queueName, Flux.from((Publisher) message).map(this::convertMessageToJson), delay, group); + } + + if (context.getReturnType().asArgument().isVoid()) { + Flux.from(messageIdsPublisher).subscribe(); + return null; + } + + if (Publishers.isConvertibleToPublisher(context.getReturnType().getType())) { + return Publishers.convertPublisher(beanContext.getConversionService(), messageIdsPublisher, context.getReturnType().getType()); + } + + return beanContext.getConversionService().convert(messageIdsPublisher, context.getReturnType().getType()); + } + return sendJson(service, queueName, message, delay, group); } @@ -148,8 +172,12 @@ private Object doIntercept(MethodInvocationContext context, Simp } private String sendJson(SimpleQueueService service, String queueName, Object message, int delay, String group) { + return service.sendMessage(queueName, convertMessageToJson(message), delay, group); + } + + private String convertMessageToJson(Object message) { try { - return service.sendMessage(queueName, objectMapper.writeValueAsString(message), delay, group); + return objectMapper.writeValueAsString(message); } catch (JsonProcessingException e) { throw new IllegalArgumentException("Failed to marshal " + message + " to JSON", e); } diff --git a/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/SimpleQueueService.java b/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/SimpleQueueService.java index 368413a0d..72068cbf9 100644 --- a/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/SimpleQueueService.java +++ b/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/SimpleQueueService.java @@ -17,8 +17,10 @@ */ package com.agorapulse.micronaut.amazon.awssdk.sqs; +import org.reactivestreams.Publisher; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.QueueAttributeName; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import java.util.List; @@ -288,4 +290,86 @@ default String sendMessage(String messageBody, Consumer messageConfiguration); + + /** + * Send message immediately + * @param queueName the name of the queue + * @param messageBodies the message bodies to be sent + * @return the publisher of message ids that must be subscribed in order to send the messages + */ + default Publisher sendMessages(String queueName, Publisher messageBodies) { + return sendMessages(queueName, messageBodies, 0); + } + + /** + * Send message with given delay. + * @param queueName the name of the queue + * @param messageBodies the message bodies to be sent + * @param delaySeconds the delay in seconds + * @param groupId group id for FIFO queues + * @return the publisher of message ids that must be subscribed in order to send the messages + */ + Publisher sendMessages(String queueName, Publisher messageBodies, int delaySeconds, String groupId); + + /** + * Send message with given delay. + * @param queueName the name of the queue + * @param messageBodies the message bodies to be sent + * @param delaySeconds the delay in seconds + * @return the publisher of message ids that must be subscribed in order to send the messages + */ + default Publisher sendMessages(String queueName, Publisher messageBodies, int delaySeconds) { + return sendMessages(queueName, messageBodies, delaySeconds, null); + } + + /** + * Send message in default queue immediately + * @param messageBodies the message bodies to be sent + * @return the publisher of message ids that must be subscribed in order to send the messages + */ + default Publisher sendMessages(Publisher messageBodies) { + return sendMessages(getDefaultQueueName(), messageBodies); + } + + /** + * Send message in the default queue with given delay. + * @param messageBodies the message bodies to be sent + * @param delaySeconds the delay in seconds + * @return the publisher of message ids that must be subscribed in order to send the messages + */ + default Publisher sendMessages(Publisher messageBodies, int delaySeconds) { + return sendMessages(getDefaultQueueName(), messageBodies, delaySeconds); + } + + /** + * Send message with given delay. + * @param messageBodies the message bodies to be sent + * @param delaySeconds the delay in seconds + * @param groupId group id for FIFO queues + * @return the publisher of message ids that must be subscribed in order to send the messages + */ + default Publisher sendMessage(Publisher messageBodies, int delaySeconds, String groupId) { + return sendMessages(getDefaultQueueName(), messageBodies, delaySeconds, groupId); + } + + /** + * Sends message with additional configuration into the default queue. + * @param messageBodies the message bodies to be sent + * @param messageConfiguration additional configuration + * @return the publisher of message ids that must be subscribed in order to send the messages + */ + default Publisher sendMessages(Publisher messageBodies, Consumer messageConfiguration) { + return sendMessages(getDefaultQueueName(), messageBodies, messageConfiguration); + } + + /** + * Sends message with additional configuration into the given queue. + * @param queueName name of the queue + * @param messageBodies the message bodies to be sent + * @param messageConfiguration additional configuration + * @return the publisher of message ids that must be subscribed in order to send the messages + */ + Publisher sendMessages(String queueName, Publisher messageBodies, Consumer messageConfiguration); + + } diff --git a/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/groovy/SimpleQueueServiceExtensions.java b/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/groovy/SimpleQueueServiceExtensions.java index 4164e9f24..87cf34b73 100644 --- a/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/groovy/SimpleQueueServiceExtensions.java +++ b/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/groovy/SimpleQueueServiceExtensions.java @@ -23,6 +23,8 @@ import groovy.lang.DelegatesTo; import groovy.transform.stc.ClosureParams; import groovy.transform.stc.FromString; +import org.reactivestreams.Publisher; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import space.jasan.support.groovy.closure.ConsumerWithDelegate; @@ -69,4 +71,40 @@ public static String sendMessage( return self.sendMessage(queueName, messageBody, ConsumerWithDelegate.create(messageConfiguration)); } + /** + * Sends message with additional configuration into the default queue. + * + * @param messageBodies message bodies + * @param messageConfiguration additional configuration + * @return message id + */ + public static Publisher sendMessages( + SimpleQueueService self, + Publisher messageBodies, + @DelegatesTo(value = SendMessageBatchRequestEntry.Builder.class, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = FromString.class, options = "software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry.Builder") + Closure messageConfiguration + ) { + return self.sendMessages(self.getDefaultQueueName(), messageBodies, ConsumerWithDelegate.create(messageConfiguration)); + } + + /** + * Sends message with additional configuration into the given queue. + * + * @param queueName name of the queue + * @param messageBodies message bodies + * @param messageConfiguration additional configuration + * @return message id + */ + public static Publisher sendMessages( + SimpleQueueService self, + String queueName, + Publisher messageBodies, + @DelegatesTo(value = SendMessageRequest.Builder.class, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = FromString.class, options = "software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry.Builder") + Closure messageConfiguration + ) { + return self.sendMessages(queueName, messageBodies, ConsumerWithDelegate.create(messageConfiguration)); + } + } diff --git a/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/DefaultClient.java b/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/DefaultClient.java index 03b84d4e4..90e92de4d 100644 --- a/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/DefaultClient.java +++ b/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/DefaultClient.java @@ -19,6 +19,7 @@ import com.agorapulse.micronaut.amazon.awssdk.sqs.annotation.Queue; import com.agorapulse.micronaut.amazon.awssdk.sqs.annotation.QueueClient; +import org.reactivestreams.Publisher; @QueueClient // <1> interface DefaultClient { @@ -38,7 +39,11 @@ interface DefaultClient { String sendMessage(String record, int delay, String group); // <8> - void deleteMessage(String messageId); // <9> + void sendStringMessages(Publisher messages); // <9> + + Publisher sendMessages(Publisher messages); // <10> + + void deleteMessage(String messageId); // <11> String OTHER_QUEUE = "OtherQueue"; } diff --git a/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/QueueClientSpec.groovy b/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/QueueClientSpec.groovy index ab4816408..1b4ff2702 100644 --- a/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/QueueClientSpec.groovy +++ b/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/QueueClientSpec.groovy @@ -20,6 +20,8 @@ package com.agorapulse.micronaut.amazon.awssdk.sqs import com.fasterxml.jackson.databind.ObjectMapper import io.micronaut.context.ApplicationContext import io.micronaut.inject.qualifiers.Qualifiers +import org.reactivestreams.Publisher +import reactor.core.publisher.Flux import spock.lang.AutoCleanup import spock.lang.Specification @@ -146,6 +148,29 @@ class QueueClientSpec extends Specification { 1 * defaultService.sendMessage(DEFAULT_QUEUE_NAME, MESSAGE, DELAY, GROUP) >> ID } + void 'can send multiple messages when publisher is a parameter'() { + given: + List ids = [ID + 1, ID + 2, ID + 3] + DefaultClient client = context.getBean(DefaultClient) + + when: + Publisher messages = client.sendMessages(Flux.just(POGO, POGO, POGO)) + + then: + Flux.from(messages).collectList().block() == ids + + 1 * defaultService.sendMessages(DEFAULT_QUEUE_NAME, _ as Publisher, 0, null) >> Flux.just(ID + 1, ID + 2, ID + 3) + } + + void 'can send multiple string messages and return void'() { + given: + DefaultClient client = context.getBean(DefaultClient) + when: + client.sendStringMessages(Flux.just(MESSAGE, MESSAGE, MESSAGE)) + then: + 1 * defaultService.sendMessages(DEFAULT_QUEUE_NAME, _ as Publisher, 0, null) >> Flux.just(ID + 1, ID + 2, ID + 3) + } + void 'needs to follow the method convention rules'() { given: TestClient client = context.getBean(TestClient) diff --git a/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/SimpleQueueServiceSpec.groovy b/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/SimpleQueueServiceSpec.groovy index 984a36ddc..8aacfda20 100644 --- a/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/SimpleQueueServiceSpec.groovy +++ b/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/SimpleQueueServiceSpec.groovy @@ -19,6 +19,7 @@ package com.agorapulse.micronaut.amazon.awssdk.sqs import io.micronaut.context.annotation.Property import io.micronaut.test.extensions.spock.annotation.MicronautTest +import reactor.core.publisher.Flux import software.amazon.awssdk.services.sqs.model.Message import software.amazon.awssdk.services.sqs.model.QueueAttributeName import spock.lang.Retry @@ -74,6 +75,21 @@ class SimpleQueueServiceSpec extends Specification { then: !service.receiveMessages() + when: + List messageIds = Flux.from(service.sendMessages(Flux.just(DATA).repeat(11))).collectList().block() + and: + List firstBatch = service.receiveMessages(10) + then: + messageIds.size() == 12 + firstBatch.size() == 10 + + when: + firstBatch.forEach { m -> service.deleteMessage(m.receiptHandle()) } + and: + List secondBatch = service.receiveMessages(10) + then: + secondBatch.size() == 2 + when: service.deleteQueue(TEST_QUEUE) then: