Skip to content

Commit

Permalink
Merge pull request #145 from hgosansn/feature/SNS-support-for-fifo-to…
Browse files Browse the repository at this point in the history
…pics

[sc90134] [SNS] Add support for SNS Fifo topics
  • Loading branch information
DKarim authored Feb 2, 2023
2 parents 1799e15 + f0f5829 commit af5ad8d
Show file tree
Hide file tree
Showing 16 changed files with 399 additions and 41 deletions.
9 changes: 5 additions & 4 deletions docs/guide/src/docs/asciidoc/sns.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ include::{root-dir}/subprojects/micronaut-amazon-awssdk-sns/src/test/groovy/com/
<1> `@NotificationClient` annotation makes the interface a SNS client
<2> You can specify to which topic is the message published using `@Topic` annotation
<3> You can publish any object which can be converted into JSON.
<4> You can add additional subject to published message (only useful for few protocols, e.g. email)
<5> You can publish a string message
<6> You can send SMS using the word `SMS` in the name of the method. One argument must be phone number and its name must contain the word `number`
<7> You can provide additional attributes for the SMS message
<4> For FIFO Topics the annotations `@MessageGroupId` and `@MessageDeduplicationId` can be added on method parameters to forward these attributes when publishing
<5> You can add additional subject to published message (only useful for few protocols, e.g. email)
<6> You can publish a string message
<7> You can send SMS using the word `SMS` in the name of the method. One argument must be phone number and its name must contain the word `number`
<8> You can provide additional attributes for the SMS message

[source,java,indent=0,options="nowrap",role="secondary"]
.Publishing String Records (AWS SDK 1.x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.CreatePlatformEndpointRequest;
import software.amazon.awssdk.services.sns.model.CreateTopicRequest;
import software.amazon.awssdk.services.sns.model.DeleteEndpointResponse;
import software.amazon.awssdk.services.sns.model.GetEndpointAttributesResponse;
import software.amazon.awssdk.services.sns.model.InvalidParameterException;
import software.amazon.awssdk.services.sns.model.ListTopicsResponse;
import software.amazon.awssdk.services.sns.model.MessageAttributeValue;
import software.amazon.awssdk.services.sns.model.NotFoundException;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import software.amazon.awssdk.services.sns.model.SetEndpointAttributesResponse;
import software.amazon.awssdk.services.sns.model.Topic;

Expand Down Expand Up @@ -110,7 +112,9 @@ public String getDefaultTopicNameOrArn() {
@Override
public String createTopic(String topicName) {
LOGGER.debug("Creating topic sns with name " + topicName);
return client.createTopic(b -> b.name(topicName)).topicArn();
return client.createTopic((CreateTopicRequest.Builder b) ->
b.name(topicName).attributes(Collections.singletonMap("FifoTopic", Boolean.toString(SimpleNotificationService.isFifoTopic(topicName))))
).topicArn();
}

@Override
Expand Down Expand Up @@ -153,7 +157,19 @@ public void unsubscribeTopic(String arn) {

@Override
public String publishMessageToTopic(String topicArn, String subject, String message, Map<String, String> attributes) {
return client.publish(r -> r.topicArn(ensureTopicArn(topicArn)).message(message).subject(subject).messageAttributes(toAttributes(attributes))).messageId();
return client.publish(r -> r.topicArn(ensureTopicArn(topicArn))
.message(message)
.subject(subject)
.messageAttributes(toAttributes(attributes))
).messageId();
}

@Override
public String publishRequest(String topicArn, Map<String, String> attributes, PublishRequest.Builder publishRequestBuilder) {
return client.publish(publishRequestBuilder.topicArn(ensureTopicArn(topicArn))
.messageAttributes(toAttributes(attributes))
.build()
).messageId();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.agorapulse.micronaut.amazon.awssdk.sns;

import com.agorapulse.micronaut.amazon.awssdk.core.util.ConfigurationUtil;
import com.agorapulse.micronaut.amazon.awssdk.sns.annotation.MessageDeduplicationId;
import com.agorapulse.micronaut.amazon.awssdk.sns.annotation.MessageGroupId;
import com.agorapulse.micronaut.amazon.awssdk.sns.annotation.NotificationClient;
import com.agorapulse.micronaut.amazon.awssdk.sns.annotation.Topic;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -30,6 +32,7 @@
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.qualifiers.Qualifiers;
import software.amazon.awssdk.services.sns.model.NotFoundException;
import software.amazon.awssdk.services.sns.model.PublishRequest;

import javax.inject.Singleton;
import java.util.Collections;
Expand All @@ -44,14 +47,18 @@ public class NotificationClientIntroduction implements MethodInterceptor<Object,
private static final String SUBJECT = "subject";
private static final String ATTRIBUTES = "attributes";
private static final String NUMBER = "number";
private static final String MESSAGE_GROUP_ID = "messageGroupId";
private static final String MESSAGE_DEDUPLICATION_ID = "messageDeduplicationId";

private static final Function<String, Optional<String>> EMPTY_IF_UNDEFINED = (String s) -> StringUtils.isEmpty(s) ? Optional.empty() : Optional.of(s);

private static class PublishingArguments {
Argument<?> message;
Argument<?> subject;
Argument<?> attributes;

Argument<?> messageGroupId;
Argument<?> messageDeduplicationId;

boolean isValid() {
return message != null;
}
Expand Down Expand Up @@ -151,19 +158,35 @@ private Object doIntercept(MethodInvocationContext<Object, Object> context, Simp
Object message = params.get(publishingArguments.message.getName());
Class<?> messageType = publishingArguments.message.getType();

String preparedMessage = "";
if (CharSequence.class.isAssignableFrom(messageType)) {
return service.publishMessageToTopic(topicName, subject, message.toString(), attributes);
preparedMessage = message.toString();
} else {
preparedMessage = toJsonMessage(message);
}

return publishJson(service, topicName, subject, message, attributes);
if (SimpleNotificationService.isFifoTopic(topicName)) {
PublishRequest.Builder publishRequestBuilder = PublishRequest.builder();
publishRequestBuilder.subject(subject);
publishRequestBuilder.message(preparedMessage);
if (publishingArguments.messageGroupId != null) {
publishRequestBuilder.messageGroupId((String) params.get(publishingArguments.messageGroupId.getName()));
}
if (publishingArguments.messageDeduplicationId != null) {
publishRequestBuilder.messageDeduplicationId((String) params.get(publishingArguments.messageDeduplicationId.getName()));
}
return service.publishRequest(topicName, attributes, publishRequestBuilder);
} else {
return service.publishMessageToTopic(topicName, subject, preparedMessage, attributes);
}
}

throw new UnsupportedOperationException("Cannot implement method " + context.getExecutableMethod());
}

private String publishJson(SimpleNotificationService service, String topic, String subject, Object message, Map<String, String> attributes) {
private String toJsonMessage(Object message) {
try {
return service.publishMessageToTopic(topic, subject, objectMapper.writeValueAsString(message), attributes);
return objectMapper.writeValueAsString(message);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Failed to marshal " + message + " to JSON", e);
}
Expand All @@ -182,6 +205,12 @@ private PublishingArguments findArguments(Argument[] arguments) {
names.attributes = argument;
continue;
}
if (argument.getName().equalsIgnoreCase(MESSAGE_GROUP_ID) || argument.isAnnotationPresent(MessageGroupId.class)) {
names.messageGroupId = argument;
}
if (argument.getName().equalsIgnoreCase(MESSAGE_DEDUPLICATION_ID) || argument.isAnnotationPresent(MessageDeduplicationId.class)) {
names.messageDeduplicationId = argument;
}
names.message = argument;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.reactivestreams.Publisher;
import software.amazon.awssdk.services.sns.model.MessageAttributeValue;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import software.amazon.awssdk.services.sns.model.Topic;

import java.net.MalformedURLException;
Expand Down Expand Up @@ -47,6 +48,7 @@ public interface SimpleNotificationService {
String PLATFORM_TYPE_ANDROID = "GCM";
@Deprecated
String PLATFORM_TYPE_AMAZON = "ADM";
String FIFO_SUFFIX = ".fifo";

enum PlatformType {
ADM,
Expand Down Expand Up @@ -230,7 +232,7 @@ default String subscribeTopicWithFunction(String topicArn, String lambdaArn) {
* @param topicArn topic ARN or name
* @param subject subject of the message (ignored by most of the protocols)
* @param message the message
* @return the is of the message published
* @return the id of the message published
*/
default String publishMessageToTopic(String topicArn, String subject, String message) {
return publishMessageToTopic(topicArn, subject, message, Collections.emptyMap());
Expand All @@ -242,10 +244,18 @@ default String publishMessageToTopic(String topicArn, String subject, String mes
* @param subject subject of the message (ignored by most of the protocols)
* @param message the message
* @param attributes the message attributes
* @return the is of the message published
* @return the id of the message published
*/
String publishMessageToTopic(String topicArn, String subject, String message, Map<String, String> attributes);

/**
* Builds and Publishes a request into the topic
* @param topicArn topic ARN or name
* @param attributes the message attributes
* @return the id of the message published
*/
String publishRequest(String topicArn, Map<String, String> attributes, PublishRequest.Builder publishRequestBuilder);

/**
* Creates new platform application.
* @param name name of the application
Expand Down Expand Up @@ -860,4 +870,13 @@ default String validateDevice(String platform, String endpointArn, String device
default String sendSMSMessage(String phoneNumber, String message) {
return sendSMSMessage(phoneNumber, message, Collections.emptyMap());
}

/**
* Checks if the $topicName matches AWS requirements for FIFO topics
* @param topicName Name of the topic
* @return true if the $topicName ends with .fifo
*/
static boolean isFifoTopic(String topicName) {
return topicName.endsWith(FIFO_SUFFIX);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2018-2023 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.micronaut.amazon.awssdk.sns.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Inherited
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.ANNOTATION_TYPE, ElementType.PARAMETER})
public @interface MessageDeduplicationId {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2018-2023 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.micronaut.amazon.awssdk.sns.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Inherited
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.ANNOTATION_TYPE, ElementType.PARAMETER})
public @interface MessageGroupId {

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,29 @@
*/
package com.agorapulse.micronaut.amazon.awssdk.sns;

import com.agorapulse.micronaut.amazon.awssdk.sns.annotation.MessageDeduplicationId;
import com.agorapulse.micronaut.amazon.awssdk.sns.annotation.MessageGroupId;
import com.agorapulse.micronaut.amazon.awssdk.sns.annotation.NotificationClient;
import com.agorapulse.micronaut.amazon.awssdk.sns.annotation.Topic;

import java.util.Map;

@NotificationClient // <1>
@NotificationClient // <1>
interface DefaultClient {

String OTHER_TOPIC = "OtherTopic";

@Topic("OtherTopic") String publishMessageToDifferentTopic(Pogo pogo); // <2>
@Topic("OtherTopic") String publishMessageToDifferentTopic(Pogo pogo); // <2>

String publishMessage(Pogo message); // <3>
String publishMessage(String subject, Pogo message); // <4>
String publishMessage(Pogo message); // <3>
String publishMessage(Pogo message, @MessageGroupId String groupId, @MessageDeduplicationId String deduplicationId); // <4>
String publishMessage(String subject, Pogo message); // <5>
String publishMessage(String subject, Pogo message, Map<String, String> attributes);
String publishMessage(String message); // <5>
String publishMessage(String message); // <6>
String publishMessage(String subject, String message);
String publishMessage(String subject, String message, Map<String, String> attributes);

String sendSMS(String phoneNumber, String message); // <6>
String sendSms(String phoneNumber, String message, Map attributes); // <7>
String sendSMS(String phoneNumber, String message); // <7>
String sendSms(String phoneNumber, String message, Map attributes); // <8>

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import io.micronaut.context.ApplicationContext
import io.micronaut.inject.qualifiers.Qualifiers
import software.amazon.awssdk.services.sns.model.NotFoundException
import software.amazon.awssdk.services.sns.model.PublishRequest
import spock.lang.AutoCleanup
import spock.lang.Specification

Expand All @@ -39,6 +40,8 @@ class NotificationClientSpec extends Specification {
private static final Map EMPTY_MAP = Collections.emptyMap()
private static final String POGO_AS_JSON = new ObjectMapper().writeValueAsString(POGO)
private static final String MESSAGE_ID = '1234567890'
private static final String MESSAGE_GROUP_ID = 'messageGroupId1'
private static final String MESSAGE_DEDUPLICATION_ID = 'messageDeduplicationId1'

SimpleNotificationService defaultService = Mock(SimpleNotificationService) {
getDefaultTopicNameOrArn() >> DEFAULT_TOPIC
Expand Down Expand Up @@ -184,6 +187,25 @@ class NotificationClientSpec extends Specification {
1 * defaultService.publishMessageToTopic(StreamClient.SOME_STREAM, null, POGO_AS_JSON, EMPTY_MAP) >> MESSAGE_ID
}

void 'can publish to fifo topic'() {
given:
TestFifoClient client = context.getBean(TestFifoClient)

when:
String messageId = client.publishFifoMessage(POGO, MESSAGE_GROUP_ID, MESSAGE_DEDUPLICATION_ID)

then:
1 * testService.publishRequest(TestFifoClient.TOPIC_NAME, EMPTY_MAP, _) >> { String topicArn,
Map<String, String> attributes,
PublishRequest.Builder publishRequestBuilder ->
PublishRequest publishRequest = publishRequestBuilder.build()
assert publishRequest.messageGroupId() == MESSAGE_GROUP_ID
assert publishRequest.messageDeduplicationId() == MESSAGE_DEDUPLICATION_ID
return MESSAGE_ID
}
messageId == MESSAGE_ID
}

void 'wrong sms method format'() {
given:
StreamClient client = context.getBean(StreamClient)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2018-2023 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.micronaut.amazon.awssdk.sns;

import com.agorapulse.micronaut.amazon.awssdk.sns.annotation.MessageDeduplicationId;
import com.agorapulse.micronaut.amazon.awssdk.sns.annotation.MessageGroupId;
import com.agorapulse.micronaut.amazon.awssdk.sns.annotation.NotificationClient;

@NotificationClient(value = "test", topic = TestFifoClient.TOPIC_NAME) interface TestFifoClient {

String TOPIC_NAME = "testFifoTopic.fifo";
String publishFifoMessage(Pogo message, @MessageGroupId String groupId, @MessageDeduplicationId String deduplicationId);
}
Loading

0 comments on commit af5ad8d

Please sign in to comment.