diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/nav.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/nav.adoc index c3ad5128..327cbb51 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/nav.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/nav.adoc @@ -6,7 +6,9 @@ ** xref:intro/getting-help.adoc[Getting Help] * xref:reference/reference.adoc[] ** xref:reference/pulsar.adoc[] +*** xref:reference/tombstones.adoc[] ** xref:reference/reactive-pulsar.adoc[] +*** xref:reference/tombstones-reactive.adoc[] ** xref:reference/pulsar-admin.adoc[] ** xref:reference/pulsar-function.adoc[] ** xref:reference/observability.adoc[] diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar.adoc index 69a16738..31820bdd 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar.adoc @@ -337,6 +337,7 @@ One advantage of the reactive concurrency model is that it can be used with `Exc The Pulsar message metadata can be consumed as Spring message headers. The list of available headers can be found in {github}/blob/main/spring-pulsar/src/main/java/org/springframework/pulsar/support/PulsarHeaders.java[PulsarHeaders.java]. +[[reactive-pulsar-headers.single]] ==== Accessing In OneByOne Listener The following example shows how you can access Pulsar Headers when using a one-by-one message listener: @@ -359,6 +360,7 @@ When doing so, the user can directly call the corresponding methods on the Pulsa However, as a convenience, you can also retrieve it by using the `Header` annotation. Note that you can also use the Spring messaging `Message` envelope to carry the payload and then retrieve the Pulsar headers by using `@Header`. +[[reactive-pulsar-headers.streaming]] ==== Accessing In Streaming Listener When using a streaming message listener the header support is limited. Only when the `Flux` contains Spring `org.springframework.messaging.Message` elements will the headers be populated. diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/tombstones-reactive.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/tombstones-reactive.adoc new file mode 100644 index 00000000..93ba86a7 --- /dev/null +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/tombstones-reactive.adoc @@ -0,0 +1,65 @@ +[[tombstones-reactive]] += Null Payloads and Log Compaction of 'Tombstone' Records + +When using log compaction, you can send and receive messages with `null` payloads to identify the deletion of a key. +You can also receive `null` values for other reasons, such as a deserializer that might return `null` when it cannot deserialize a value. + +[[tombstones-reactive.produce]] +== Producing Null Payloads +You can send a `null` value with the `ReactivePulsarTemplate` by passing a `null` message parameter value to one of the `send` methods, for example: +[source, java] +---- +reactiveTemplate + .send(null, Schema.STRING) + .subscribe(); +---- +NOTE: When sending null values you must specify the schema type as the system can not determine the type of the message from a `null` payload. + +[[tombstones-reactive.consume]] +== Consuming Null Payloads +For `@ReactivePularListener`, the `null` payload is passed into the listener method based on the type of its message parameter as follows: +|=== +| Parameter type | Passed-in value + +| primitive +| `null` + +| user-defined +| `null` + +| `org.apache.pulsar.client.api.Message` +| non-null Pulsar message whose `getValue()` returns `null` + +| `org.springframework.messaging.Message` +| non-null Spring message whose `getPayload()` returns `PulsarNull` + +| `Flux>` +| non-null flux whose entries are non-null Pulsar messages whose `getValue()` returns `null` + +| `Flux>` +| non-null flux whose entries are non-null Spring messages whose `getPayload()` returns `PulsarNull` + +|=== + +IMPORTANT: When the passed-in value is `null` (ie. single record listeners with primitive or user-defined types) you must use the `@Payload` parameter annotation with `required = false`. + +IMPORTANT: When using the Spring `org.springframework.messaging.Message` for your listener payload type, its generic type information must be wide enough to accept `Message` (eg. `Message`, `Message`, or `Message`). +This is due to the fact that the Spring Message does not allow null values for its payload and instead uses the `PulsarNull` placeholder. + +If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was +++"+++`deleted`+++"+++. +The following example shows such a configuration: + +[source, java] +---- +@ReactivePulsarListener( + topics = "my-topic", + subscriptionName = "my-topic-sub", + schemaType = SchemaType.STRING) +Mono myListener( + @Payload(required = false) String msg, + @Header(PulsarHeaders.KEY) String key) { + ... +} +---- + +NOTE: When using a streaming message listener (`Flux`) the xref:reference/reactive-pulsar.adoc#reactive-pulsar-headers.streaming[header support is limited], so it less useful in the log compaction scenario. diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/tombstones.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/tombstones.adoc new file mode 100644 index 00000000..cb77cb78 --- /dev/null +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/tombstones.adoc @@ -0,0 +1,68 @@ +[[tombstones]] += Null Payloads and Log Compaction of 'Tombstone' Records + +When using log compaction, you can send and receive messages with `null` payloads to identify the deletion of a key. +You can also receive `null` values for other reasons, such as a deserializer that might return `null` when it cannot deserialize a value. + +[[tombstones.produce]] +== Producing Null Payloads +To send a `null` payload by using the `PulsarTemplate`, you can use the fluent API and pass null into the value argument of the `newMessage()` method, for example: +[source, java] +---- +pulsarTemplate + .newMessage(null) + .withTopic("my-topic") + .withSchema(Schema.STRING) + .withMessageCustomizer((mb) -> mb.key("key:1234")) + .send(); +---- +NOTE: When sending null values you must specify the schema type as the system can not determine the type of the message from a `null` payload. + +[[tombstones.consume]] +== Consuming Null Payloads +For `@PulsarListener` and `@PulsarReader`, the `null` payload is passed into the listener method based on the type of its message parameter as follows: +|=== +| Parameter type | Passed-in value + +| primitive +| `null` + +| user-defined +| `null` + +| `org.apache.pulsar.client.api.Message` +| non-null Pulsar message whose `getValue()` returns `null` + +| `org.springframework.messaging.Message` +| non-null Spring message whose `getPayload()` returns `PulsarNull` + +| `List` +| non-null list whose entries (`X`) are one of the above types and act accordingly (ie. primitive entries are `null` etc..) + +| `org.apache.pulsar.client.api.Messages` +| non-null container of non-null Pulsar messages whose `getValue()` returns `null` + +|=== + +IMPORTANT: When the passed-in value is `null` (ie. single record listeners with primitive or user-defined types) you must use the `@Payload` parameter annotation with `required = false`. + +IMPORTANT: When using the Spring `org.springframework.messaging.Message` for your listener payload type, its generic type information must be wide enough to accept `Message` (eg. `Message`, `Message`, or `Message`). +This is due to the fact that the Spring Message does not allow null values for its payload and instead uses the `PulsarNull` placeholder. + +If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was +++"+++`deleted`+++"+++. +The following example shows such a configuration: + +[source, java] +---- +@PulsarListener( + topics = "my-topic", + subscriptionName = "my-topic-sub", + schemaType = SchemaType.STRING) +void myListener( + @Payload(required = false) String msg, + @Header(PulsarHeaders.KEY) String key) { + ... +} +---- + +NOTE: The `@PulsarReader` does not yet support `@Header` arguments, so it is less useful in the log compaction scenario. diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java index 4534f68b..d39386ff 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java @@ -174,8 +174,8 @@ private ResolvableType resolvableType(MethodParameter methodParameter) { if (rawClass != null && isContainerType(rawClass)) { resolvableType = resolvableType.getGeneric(0); } - if (Message.class.isAssignableFrom(resolvableType.getRawClass()) - || org.springframework.messaging.Message.class.isAssignableFrom(resolvableType.getRawClass())) { + if (resolvableType.getRawClass() != null && (Message.class.isAssignableFrom(resolvableType.getRawClass()) + || org.springframework.messaging.Message.class.isAssignableFrom(resolvableType.getRawClass()))) { resolvableType = resolvableType.getGeneric(0); } return resolvableType; diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java index d904d9c4..bb8a6e14 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java @@ -33,8 +33,6 @@ import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; @@ -44,28 +42,22 @@ import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaType; -import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory; import org.apache.pulsar.reactive.client.api.MessageResult; import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer; import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec; -import org.apache.pulsar.reactive.client.api.ReactivePulsarClient; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.handler.annotation.Header; import org.springframework.pulsar.annotation.EnablePulsar; -import org.springframework.pulsar.core.DefaultPulsarClientFactory; import org.springframework.pulsar.core.DefaultPulsarProducerFactory; import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.DefaultTopicResolver; -import org.springframework.pulsar.core.PulsarAdministration; import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarTemplate; -import org.springframework.pulsar.core.PulsarTopic; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; @@ -74,18 +66,16 @@ import org.springframework.pulsar.reactive.config.annotation.EnableReactivePulsar; import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener; import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer; -import org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory; import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer; import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.BasicListenersTestCases.BasicListenersTestCasesConfig; import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersTest.PulsarListenerWithHeadersConfig; import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SchemaCustomMappingsTestCases.SchemaCustomMappingsTestConfig.User2; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.StreamingListenerTestCases.StreamingListenerTestCasesConfig; import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithDefaultType.WithDefaultTypeConfig; import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithSpecificTypes.WithSpecificTypesConfig; import org.springframework.pulsar.support.PulsarHeaders; -import org.springframework.pulsar.test.support.PulsarTestContainerSupport; -import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import org.springframework.test.util.ReflectionTestUtils; import org.springframework.util.ObjectUtils; @@ -98,77 +88,11 @@ * @author Christophe Bornet * @author Chris Bono */ -@SpringJUnitConfig -@DirtiesContext -public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { - - @Autowired - PulsarTemplate pulsarTemplate; - - @Autowired - private PulsarClient pulsarClient; - - @Configuration(proxyBeanMethods = false) - @EnableReactivePulsar - public static class TopLevelConfig { - - @Bean - public PulsarProducerFactory pulsarProducerFactory(PulsarClient pulsarClient) { - return new DefaultPulsarProducerFactory<>(pulsarClient); - } - - @Bean - public PulsarClient pulsarClient() throws PulsarClientException { - return new DefaultPulsarClientFactory(PulsarTestContainerSupport.getPulsarBrokerUrl()).createClient(); - } - - @Bean - public ReactivePulsarClient pulsarReactivePulsarClient(PulsarClient pulsarClient) { - return AdaptedReactivePulsarClientFactory.create(pulsarClient); - } - - @Bean - public PulsarTemplate pulsarTemplate(PulsarProducerFactory pulsarProducerFactory) { - return new PulsarTemplate<>(pulsarProducerFactory); - } - - @SuppressWarnings("unchecked") - @Bean - public ConsumerTrackingReactivePulsarConsumerFactory pulsarConsumerFactory( - ReactivePulsarClient pulsarClient, - ObjectProvider> defaultConsumerCustomizersProvider) { - DefaultReactivePulsarConsumerFactory consumerFactory = new DefaultReactivePulsarConsumerFactory<>( - pulsarClient, defaultConsumerCustomizersProvider.orderedStream().toList()); - return new ConsumerTrackingReactivePulsarConsumerFactory<>(consumerFactory); - } - - @Bean - ReactivePulsarListenerContainerFactory reactivePulsarListenerContainerFactory( - ReactivePulsarConsumerFactory pulsarConsumerFactory) { - return new DefaultReactivePulsarListenerContainerFactory<>(pulsarConsumerFactory, - new ReactivePulsarContainerProperties<>()); - } - - @Bean - PulsarAdministration pulsarAdministration() { - return new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl()); - } - - @Bean - PulsarTopic partitionedTopic() { - return PulsarTopic.builder("persistent://public/default/concurrency-on-pl").numberOfPartitions(3).build(); - } - - @Bean - ReactivePulsarListenerMessageConsumerBuilderCustomizer subscriptionInitialPositionEarliest() { - return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest); - } - - } +class ReactivePulsarListenerTests extends ReactivePulsarListenerTestsBase { @Nested - @ContextConfiguration(classes = PulsarListenerBasicTestCases.TestPulsarListenersForBasicScenario.class) - class PulsarListenerBasicTestCases { + @ContextConfiguration(classes = BasicListenersTestCasesConfig.class) + class BasicListenersTestCases { static CountDownLatch latch1 = new CountDownLatch(1); static CountDownLatch latch2 = new CountDownLatch(1); @@ -212,7 +136,7 @@ void testPulsarListenerWithTopicsPattern() throws Exception { @EnableReactivePulsar @Configuration - static class TestPulsarListenersForBasicScenario { + static class BasicListenersTestCasesConfig { @ReactivePulsarListener(id = "id-1", topics = "topic-1", subscriptionName = "subscription-1", consumerCustomizer = "listen1Customizer") @@ -256,8 +180,8 @@ ReactivePulsarListenerMessageConsumerBuilderCustomizer listen3Customizer } @Nested - @ContextConfiguration(classes = PulsarListenerStreamingTestCases.TestPulsarListenersForStreaming.class) - class PulsarListenerStreamingTestCases { + @ContextConfiguration(classes = StreamingListenerTestCasesConfig.class) + class StreamingListenerTestCases { static CountDownLatch latch1 = new CountDownLatch(10); static CountDownLatch latch2 = new CountDownLatch(10); @@ -280,7 +204,7 @@ void testPulsarListenerStreamingSpringMessage() throws Exception { @EnableReactivePulsar @Configuration - static class TestPulsarListenersForStreaming { + static class StreamingListenerTestCasesConfig { @ReactivePulsarListener(topics = "streaming-1", stream = true, consumerCustomizer = "subscriptionInitialPositionEarliest") diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTestsBase.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTestsBase.java new file mode 100644 index 00000000..06560b61 --- /dev/null +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTestsBase.java @@ -0,0 +1,120 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.reactive.listener; + +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory; +import org.apache.pulsar.reactive.client.api.ReactivePulsarClient; + +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.pulsar.core.DefaultPulsarClientFactory; +import org.springframework.pulsar.core.DefaultPulsarProducerFactory; +import org.springframework.pulsar.core.PulsarAdministration; +import org.springframework.pulsar.core.PulsarProducerFactory; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.core.PulsarTopic; +import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; +import org.springframework.pulsar.reactive.config.ReactivePulsarListenerContainerFactory; +import org.springframework.pulsar.reactive.config.annotation.EnableReactivePulsar; +import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener; +import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer; +import org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory; +import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer; +import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.ConsumerTrackingReactivePulsarConsumerFactory; +import org.springframework.pulsar.test.support.PulsarTestContainerSupport; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * Provides base support for {@link ReactivePulsarListener @ReactivePulsarListener} tests. + * + * @author Chris Bono + */ +@SpringJUnitConfig +@DirtiesContext +abstract class ReactivePulsarListenerTestsBase implements PulsarTestContainerSupport { + + @Autowired + protected PulsarTemplate pulsarTemplate; + + @Autowired + protected PulsarClient pulsarClient; + + @Configuration(proxyBeanMethods = false) + @EnableReactivePulsar + public static class TopLevelConfig { + + @Bean + PulsarProducerFactory pulsarProducerFactory(PulsarClient pulsarClient) { + return new DefaultPulsarProducerFactory<>(pulsarClient); + } + + @Bean + PulsarClient pulsarClient() throws PulsarClientException { + return new DefaultPulsarClientFactory(PulsarTestContainerSupport.getPulsarBrokerUrl()).createClient(); + } + + @Bean + ReactivePulsarClient pulsarReactivePulsarClient(PulsarClient pulsarClient) { + return AdaptedReactivePulsarClientFactory.create(pulsarClient); + } + + @Bean + PulsarTemplate pulsarTemplate(PulsarProducerFactory pulsarProducerFactory) { + return new PulsarTemplate<>(pulsarProducerFactory); + } + + @SuppressWarnings("unchecked") + @Bean + ConsumerTrackingReactivePulsarConsumerFactory pulsarConsumerFactory(ReactivePulsarClient pulsarClient, + ObjectProvider> defaultConsumerCustomizersProvider) { + DefaultReactivePulsarConsumerFactory consumerFactory = new DefaultReactivePulsarConsumerFactory<>( + pulsarClient, defaultConsumerCustomizersProvider.orderedStream().toList()); + return new ConsumerTrackingReactivePulsarConsumerFactory<>(consumerFactory); + } + + @Bean + ReactivePulsarListenerContainerFactory reactivePulsarListenerContainerFactory( + ReactivePulsarConsumerFactory pulsarConsumerFactory) { + return new DefaultReactivePulsarListenerContainerFactory<>(pulsarConsumerFactory, + new ReactivePulsarContainerProperties<>()); + } + + @Bean + PulsarAdministration pulsarAdministration() { + return new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl()); + } + + @Bean + PulsarTopic partitionedTopic() { + return PulsarTopic.builder("persistent://public/default/concurrency-on-pl").numberOfPartitions(3).build(); + } + + @Bean + ReactivePulsarListenerMessageConsumerBuilderCustomizer subscriptionInitialPositionEarliest() { + return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest); + } + + } + +} diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTombstoneTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTombstoneTests.java new file mode 100644 index 00000000..291eefc5 --- /dev/null +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTombstoneTests.java @@ -0,0 +1,359 @@ +/* + * Copyright 2022-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.reactive.listener; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.reactive.client.api.MessageResult; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.pulsar.core.DefaultPulsarProducerFactory; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTombstoneTests.PulsarMessagePayload.PulsarMessagePayloadConfig; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTombstoneTests.SingleComplexPayload.SingleComplexPayloadConfig; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTombstoneTests.SinglePrimitivePayload.SinglePrimitivePayloadConfig; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTombstoneTests.SpringMessagePayload.SpringMessagePayloadConfig; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTombstoneTests.StreamingPulsarMessagePayload.StreamingPulsarMessagePayloadConfig; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTombstoneTests.StreamingSpringMessagePayload.StreamingSpringMessagePayloadConfig; +import org.springframework.pulsar.support.PulsarHeaders; +import org.springframework.pulsar.support.PulsarNull; +import org.springframework.test.context.ContextConfiguration; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Tests consuming records (including {@link PulsarNull tombstones}) in + * {@link ReactivePulsarListener @ReactivePulsarListener}. + * + * @author Chris Bono + */ +class ReactivePulsarListenerTombstoneTests extends ReactivePulsarListenerTestsBase { + + static void sendTestMessages(PulsarTemplate pulsarTemplate, String topic, Schema schema, + Function payloadFactory) throws PulsarClientException { + pulsarTemplate.newMessage(payloadFactory.apply("foo")) + .withTopic(topic) + .withMessageCustomizer((mb) -> mb.key("key:foo")) + .send(); + pulsarTemplate.newMessage(null) + .withTopic(topic) + .withSchema(schema) + .withMessageCustomizer((mb) -> mb.key("key:null")) + .send(); + pulsarTemplate.newMessage(payloadFactory.apply("bar")) + .withTopic(topic) + .withMessageCustomizer((mb) -> mb.key("key:bar")) + .send(); + } + + static void assertMessagesReceivedWithHeaders(List> receivedMessages, + Function payloadFactory) { + assertThat(receivedMessages).containsExactly(new ReceivedMessage<>(payloadFactory.apply("foo"), "key:foo"), + new ReceivedMessage<>(null, "key:null"), new ReceivedMessage<>(payloadFactory.apply("bar"), "key:bar")); + } + + static void assertMessagesReceivedWithoutHeaders(List> receivedMessages, + Function payloadFactory) { + assertThat(receivedMessages).containsExactly(new ReceivedMessage<>(payloadFactory.apply("foo"), null), + new ReceivedMessage<>(null, null), new ReceivedMessage<>(payloadFactory.apply("bar"), null)); + } + + @Nested + @ContextConfiguration(classes = PulsarMessagePayloadConfig.class) + class PulsarMessagePayload { + + private static final String TOPIC = "rpltt-pulsar-msg-topic"; + + static CountDownLatch latchWithHeaders = new CountDownLatch(3); + static CountDownLatch latchWithoutHeaders = new CountDownLatch(3); + static List> receivedMessagesWithHeaders = new ArrayList<>(); + static List> receivedMessagesWithoutHeaders = new ArrayList<>(); + + @Test + void shouldReceiveMessagesWithTombstone() throws Exception { + sendTestMessages(pulsarTemplate, TOPIC, Schema.STRING, Function.identity()); + assertThat(latchWithHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(latchWithoutHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertMessagesReceivedWithHeaders(receivedMessagesWithHeaders, Function.identity()); + assertMessagesReceivedWithoutHeaders(receivedMessagesWithoutHeaders, Function.identity()); + } + + @Configuration(proxyBeanMethods = false) + static class PulsarMessagePayloadConfig { + + @ReactivePulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-headers", + schemaType = SchemaType.STRING, consumerCustomizer = "subscriptionInitialPositionEarliest") + Mono listenWithHeaders(org.apache.pulsar.client.api.Message msg, + @Header(PulsarHeaders.KEY) String key) { + receivedMessagesWithHeaders.add(new ReceivedMessage<>(msg.getValue(), key)); + latchWithHeaders.countDown(); + return Mono.empty(); + } + + @ReactivePulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-no-headers", + schemaType = SchemaType.STRING, consumerCustomizer = "subscriptionInitialPositionEarliest") + Mono listenWithoutHeaders(org.apache.pulsar.client.api.Message msg) { + receivedMessagesWithoutHeaders.add(new ReceivedMessage<>(msg.getValue(), null)); + latchWithoutHeaders.countDown(); + return Mono.empty(); + } + + } + + } + + @Nested + @ContextConfiguration(classes = SpringMessagePayloadConfig.class) + class SpringMessagePayload { + + private static final String TOPIC = "rpltt-spring-msg-topic"; + + static CountDownLatch latchWithHeaders = new CountDownLatch(3); + static CountDownLatch latchWithoutHeaders = new CountDownLatch(3); + static List> receivedMessagesWithHeaders = new ArrayList<>(); + static List> receivedMessagesWithoutHeaders = new ArrayList<>(); + + @Test + void shouldReceiveMessagesWithTombstone() throws Exception { + sendTestMessages(pulsarTemplate, TOPIC, Schema.STRING, Function.identity()); + assertThat(latchWithHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(latchWithoutHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertMessagesReceivedWithHeaders(receivedMessagesWithHeaders, Function.identity()); + assertMessagesReceivedWithoutHeaders(receivedMessagesWithoutHeaders, Function.identity()); + } + + @Configuration(proxyBeanMethods = false) + static class SpringMessagePayloadConfig { + + @ReactivePulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-headers", + schemaType = SchemaType.STRING, consumerCustomizer = "subscriptionInitialPositionEarliest") + Mono listenWithHeaders(Message msg, @Header(PulsarHeaders.KEY) String key) { + var payload = (msg.getPayload() != PulsarNull.INSTANCE) ? msg.getPayload().toString() : null; + receivedMessagesWithHeaders.add(new ReceivedMessage<>(payload, key)); + latchWithHeaders.countDown(); + return Mono.empty(); + } + + @ReactivePulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-no-headers", + schemaType = SchemaType.STRING, consumerCustomizer = "subscriptionInitialPositionEarliest") + Mono listenWithoutHeaders(Message msg) { + var payload = (msg.getPayload() != PulsarNull.INSTANCE) ? msg.getPayload().toString() : null; + receivedMessagesWithoutHeaders.add(new ReceivedMessage<>(payload, null)); + latchWithoutHeaders.countDown(); + return Mono.empty(); + } + + } + + } + + @Nested + @ContextConfiguration(classes = SinglePrimitivePayloadConfig.class) + class SinglePrimitivePayload { + + private static final String TOPIC = "rpltt-single-primitive-topic"; + + static CountDownLatch latchWithHeaders = new CountDownLatch(3); + static CountDownLatch latchWithoutHeaders = new CountDownLatch(3); + static List> receivedMessagesWithHeaders = new ArrayList<>(); + static List> receivedMessagesWithoutHeaders = new ArrayList<>(); + + @Test + void shouldReceiveMessagesWithTombstone() throws Exception { + sendTestMessages(pulsarTemplate, TOPIC, Schema.STRING, Function.identity()); + assertThat(latchWithHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(latchWithoutHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertMessagesReceivedWithHeaders(receivedMessagesWithHeaders, Function.identity()); + assertMessagesReceivedWithoutHeaders(receivedMessagesWithoutHeaders, Function.identity()); + } + + @Configuration(proxyBeanMethods = false) + static class SinglePrimitivePayloadConfig { + + @ReactivePulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-headers", + schemaType = SchemaType.STRING, consumerCustomizer = "subscriptionInitialPositionEarliest") + Mono listenWithHeaders(@Payload(required = false) String msg, @Header(PulsarHeaders.KEY) String key) { + receivedMessagesWithHeaders.add(new ReceivedMessage<>(msg, key)); + latchWithHeaders.countDown(); + return Mono.empty(); + } + + @ReactivePulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-no-headers", + schemaType = SchemaType.STRING, consumerCustomizer = "subscriptionInitialPositionEarliest") + Mono listenWithoutHeaders(@Payload(required = false) String msg) { + receivedMessagesWithoutHeaders.add(new ReceivedMessage<>(msg, null)); + latchWithoutHeaders.countDown(); + return Mono.empty(); + } + + } + + } + + @Nested + @ContextConfiguration(classes = SingleComplexPayloadConfig.class) + class SingleComplexPayload { + + private static final String TOPIC = "rpltt-single-complex-topic"; + + static CountDownLatch latchWithHeaders = new CountDownLatch(3); + static CountDownLatch latchWithoutHeaders = new CountDownLatch(3); + static List> receivedMessagesWithHeaders = new ArrayList<>(); + static List> receivedMessagesWithoutHeaders = new ArrayList<>(); + + @Test + void shouldReceiveMessagesWithTombstone() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var fooPulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); + sendTestMessages(fooPulsarTemplate, TOPIC, Schema.JSON(Foo.class), Foo::new); + assertThat(latchWithHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(latchWithoutHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertMessagesReceivedWithHeaders(receivedMessagesWithHeaders, Foo::new); + assertMessagesReceivedWithoutHeaders(receivedMessagesWithoutHeaders, Foo::new); + } + + @Configuration(proxyBeanMethods = false) + static class SingleComplexPayloadConfig { + + @ReactivePulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-headers", + schemaType = SchemaType.JSON, consumerCustomizer = "subscriptionInitialPositionEarliest") + Mono listenWithHeaders(@Payload(required = false) Foo msg, @Header(PulsarHeaders.KEY) String key) { + latchWithHeaders.countDown(); + receivedMessagesWithHeaders.add(new ReceivedMessage<>(msg, key)); + return Mono.empty(); + } + + @ReactivePulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-no-headers", + schemaType = SchemaType.JSON, consumerCustomizer = "subscriptionInitialPositionEarliest") + Mono listenWithoutHeaders(@Payload(required = false) Foo msg) { + latchWithoutHeaders.countDown(); + receivedMessagesWithoutHeaders.add(new ReceivedMessage<>(msg, null)); + return Mono.empty(); + } + + } + + } + + @Nested + @ContextConfiguration(classes = StreamingPulsarMessagePayloadConfig.class) + class StreamingPulsarMessagePayload { + + private static final String TOPIC = "rpltt-multi-pulsar-msg-topic"; + + static CountDownLatch latchWithoutHeaders = new CountDownLatch(3); + static List> receivedMessagesWithoutHeaders = new ArrayList<>(); + + @Test + void shouldReceiveMessagesWithTombstone() throws Exception { + sendTestMessages(pulsarTemplate, TOPIC, Schema.STRING, Function.identity()); + assertThat(latchWithoutHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertMessagesReceivedWithoutHeaders(receivedMessagesWithoutHeaders, Function.identity()); + } + + @Configuration(proxyBeanMethods = false) + static class StreamingPulsarMessagePayloadConfig { + + @ReactivePulsarListener(topics = TOPIC, stream = true, schemaType = SchemaType.STRING, + consumerCustomizer = "subscriptionInitialPositionEarliest") + Flux> listenWithoutHeaders( + Flux> messages) { + return messages.doOnNext(m -> { + receivedMessagesWithoutHeaders.add(new ReceivedMessage<>(m.getValue(), null)); + latchWithoutHeaders.countDown(); + }).map(MessageResult::acknowledge); + } + + } + + } + + @Nested + @ContextConfiguration(classes = StreamingSpringMessagePayloadConfig.class) + class StreamingSpringMessagePayload { + + private static final String TOPIC = "rpltt-multi-spring-msg-topic"; + + static CountDownLatch latchWithHeaders = new CountDownLatch(3); + static List> receivedMessagesWithHeaders = new ArrayList<>(); + + @Test + void shouldReceiveMessagesWithTombstone() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var fooPulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); + sendTestMessages(fooPulsarTemplate, TOPIC, Schema.JSON(Foo.class), Foo::new); + assertThat(latchWithHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertMessagesReceivedWithHeaders(receivedMessagesWithHeaders, Foo::new); + } + + @SuppressWarnings("rawtypes") + @Configuration(proxyBeanMethods = false) + static class StreamingSpringMessagePayloadConfig { + + @ReactivePulsarListener(topics = TOPIC, stream = true, schemaType = SchemaType.JSON, + consumerCustomizer = "subscriptionInitialPositionEarliest") + Flux> listenWithHeaders(Flux> messages) { + return messages.doOnNext(m -> { + Object payload = m.getPayload(); + if (payload == PulsarNull.INSTANCE) { + payload = null; + } + else if (payload instanceof Map payloadFields) { + payload = new Foo((String) payloadFields.get("value")); + } + var keyHeader = (String) m.getHeaders().get(PulsarHeaders.KEY); + receivedMessagesWithHeaders.add(new ReceivedMessage<>(payload, keyHeader)); + latchWithHeaders.countDown(); + }).map(m -> this.messageIdFrom(m)).map(MessageResult::acknowledge); + } + + private MessageId messageIdFrom(Message springMessage) { + if (springMessage.getHeaders().get(PulsarHeaders.MESSAGE_ID) instanceof MessageId msgId) { + return msgId; + } + throw new RuntimeException("Spring Message missing '%s' header".formatted(PulsarHeaders.MESSAGE_ID)); + } + + } + + } + + record Foo(String value) { + } + + record ReceivedMessage(T payload, String keyHeader) { + } + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/AbstractPulsarAnnotationsBeanPostProcessor.java b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/AbstractPulsarAnnotationsBeanPostProcessor.java index 4d7c3513..c5df6202 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/AbstractPulsarAnnotationsBeanPostProcessor.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/AbstractPulsarAnnotationsBeanPostProcessor.java @@ -326,9 +326,9 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() { GenericMessageConverter messageConverter = new GenericMessageConverter( this.defaultFormattingConversionService); defaultFactory.setMessageConverter(messageConverter); - + defaultFactory + .setCustomArgumentResolvers(List.of(new PulsarNullAwarePayloadArgumentResolver(messageConverter))); defaultFactory.afterPropertiesSet(); - return defaultFactory; } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarNullAwarePayloadArgumentResolver.java b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarNullAwarePayloadArgumentResolver.java new file mode 100644 index 00000000..626f43e0 --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarNullAwarePayloadArgumentResolver.java @@ -0,0 +1,63 @@ +/* + * Copyright 2023-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.annotation; + +import java.util.List; + +import org.springframework.core.MethodParameter; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.pulsar.support.PulsarNull; + +/** + * A {@link PayloadMethodArgumentResolver} that can properly decode {@link PulsarNull} + * payloads into null. + * + * @author Chris Bono + * @since 1.0.1 + */ +public class PulsarNullAwarePayloadArgumentResolver extends PayloadMethodArgumentResolver { + + PulsarNullAwarePayloadArgumentResolver(MessageConverter messageConverter) { + super(messageConverter); + } + + @Override + public Object resolveArgument(MethodParameter parameter, Message message) throws Exception { + if (message == null) { + message = new GenericMessage<>(PulsarNull.INSTANCE); + } + Object resolved = super.resolveArgument(parameter, message); + // Replace 'PulsarNull' elements w/ 'null' + if (resolved instanceof List list) { + for (int i = 0; i < list.size(); i++) { + if (list.get(i) instanceof PulsarNull) { + list.set(i, null); + } + } + } + return resolved; + } + + @Override + protected boolean isEmptyPayload(Object payload) { + return payload == null || payload instanceof PulsarNull; + } + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarReaderAnnotationBeanPostProcessor.java b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarReaderAnnotationBeanPostProcessor.java index 37bbfa37..78f3f824 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarReaderAnnotationBeanPostProcessor.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarReaderAnnotationBeanPostProcessor.java @@ -36,7 +36,11 @@ import org.springframework.context.ApplicationContext; import org.springframework.core.MethodIntrospector; import org.springframework.core.annotation.AnnotatedElementUtils; +import org.springframework.core.convert.converter.Converter; +import org.springframework.core.convert.converter.GenericConverter; import org.springframework.core.log.LogAccessor; +import org.springframework.format.Formatter; +import org.springframework.format.FormatterRegistry; import org.springframework.lang.Nullable; import org.springframework.pulsar.config.MethodPulsarReaderEndpoint; import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames; @@ -120,6 +124,7 @@ public void afterSingletonsInstantiated() { if (this.defaultContainerFactoryBeanName != null) { this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName); } + addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService); postProcessEndpointsBeforeRegistration(); // Register all readers this.registrar.afterPropertiesSet(); @@ -292,4 +297,10 @@ private Set findReaderAnnotations(Method method) { return readers; } + private void addFormatters(FormatterRegistry registry) { + this.beanFactory.getBeanProvider(Converter.class).forEach(registry::addConverter); + this.beanFactory.getBeanProvider(GenericConverter.class).forEach(registry::addConverter); + this.beanFactory.getBeanProvider(Formatter.class).forEach(registry::addFormatter); + } + } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java index 8e4dde68..885c0835 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java @@ -119,9 +119,6 @@ protected AbstractPulsarMessageToSpringMessageAdapter createMessageListener( HandlerAdapter handlerMethod = configureListenerAdapter(messageListener); messageListener.setHandlerMethod(handlerMethod); - // Since we have access to the handler method here, check if we can type infer the - // Schema used. - // TODO: filter out the payload type by excluding Consumer, Message, Messages etc. MethodParameter[] methodParameters = handlerMethod.getInvokerHandlerMethod().getMethodParameters(); @@ -179,8 +176,8 @@ private ResolvableType resolvableType(MethodParameter methodParameter) { if (rawClass != null && isContainerType(rawClass)) { resolvableType = resolvableType.getGeneric(0); } - if (Message.class.isAssignableFrom(resolvableType.getRawClass()) - || org.springframework.messaging.Message.class.isAssignableFrom(resolvableType.getRawClass())) { + if (resolvableType.getRawClass() != null && (Message.class.isAssignableFrom(resolvableType.getRawClass()) + || org.springframework.messaging.Message.class.isAssignableFrom(resolvableType.getRawClass()))) { resolvableType = resolvableType.getGeneric(0); } return resolvableType; diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarReaderEndpoint.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarReaderEndpoint.java index 258af324..46675af7 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarReaderEndpoint.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarReaderEndpoint.java @@ -93,9 +93,6 @@ protected AbstractPulsarMessageToSpringMessageAdapter createReaderListener( HandlerAdapter handlerMethod = configureListenerAdapter(readerListener); readerListener.setHandlerMethod(handlerMethod); - // Since we have access to the handler method here, check if we can type infer the - // Schema used. - // TODO: filter out the payload type by excluding Consumer, Message, Messages etc. MethodParameter[] methodParameters = handlerMethod.getInvokerHandlerMethod().getMethodParameters(); @@ -129,15 +126,6 @@ protected AbstractPulsarMessageToSpringMessageAdapter createReaderListener( } // TODO: If no topic info is set on endpoint attempt to resolve via message type - // TopicResolver topicResolver = pulsarContainerProperties.getTopicResolver(); - // boolean hasTopicInfo = - // !ObjectUtils.isEmpty(pulsarContainerProperties.getTopics()) - // || StringUtils.hasText(pulsarContainerProperties.getTopicsPattern()); - // if (!hasTopicInfo) { - // topicResolver.resolveTopic(null, messageType.getRawClass(), () -> null) - // .ifResolved((topic) -> pulsarContainerProperties.setTopics(new String[] { topic - // })); - // } container.setReaderCustomizer(this.readerBuilderCustomizer); @@ -150,8 +138,8 @@ private ResolvableType resolvableType(MethodParameter methodParameter) { if (rawClass != null && isContainerType(rawClass)) { resolvableType = resolvableType.getGeneric(0); } - if (Message.class.isAssignableFrom(resolvableType.getRawClass()) - || org.springframework.messaging.Message.class.isAssignableFrom(resolvableType.getRawClass())) { + if (resolvableType.getRawClass() != null && (Message.class.isAssignableFrom(resolvableType.getRawClass()) + || org.springframework.messaging.Message.class.isAssignableFrom(resolvableType.getRawClass()))) { resolvableType = resolvableType.getGeneric(0); } return resolvableType; diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/support/PulsarNull.java b/spring-pulsar/src/main/java/org/springframework/pulsar/support/PulsarNull.java new file mode 100644 index 00000000..cda2d04f --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/support/PulsarNull.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.support; + +/** + * Represents a null record value. + * + * @author Chris Bono + * @since 1.0.1 + */ +public final class PulsarNull { + + /** + * Singleton instance. + */ + public static final PulsarNull INSTANCE = new PulsarNull(); + + private PulsarNull() { + } + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/support/converter/PulsarMessageConverter.java b/spring-pulsar/src/main/java/org/springframework/pulsar/support/converter/PulsarMessageConverter.java index 01ad20dc..e222524c 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/support/converter/PulsarMessageConverter.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/support/converter/PulsarMessageConverter.java @@ -34,9 +34,6 @@ public interface PulsarMessageConverter extends MessageConverter { Message toMessage(org.apache.pulsar.client.api.Message record, Consumer consumer, Type payloadType); - default Message toMessageFromReader(org.apache.pulsar.client.api.Message record, Reader reader, - Type payloadType) { - return null; - } + Message toMessageFromReader(org.apache.pulsar.client.api.Message record, Reader reader, Type payloadType); } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/support/converter/PulsarRecordMessageConverter.java b/spring-pulsar/src/main/java/org/springframework/pulsar/support/converter/PulsarRecordMessageConverter.java index b167c331..f9e8927b 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/support/converter/PulsarRecordMessageConverter.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/support/converter/PulsarRecordMessageConverter.java @@ -19,10 +19,12 @@ import java.lang.reflect.Type; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Reader; import org.springframework.messaging.Message; import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.pulsar.support.PulsarNull; import org.springframework.pulsar.support.header.PulsarHeaderMapper; /** @@ -49,6 +51,12 @@ public Message toMessage(org.apache.pulsar.client.api.Message record, Cons return MessageBuilder.createMessage(extractAndConvertValue(record), this.headerMapper.toSpringHeaders(record)); } + @Override + public Message toMessageFromReader(org.apache.pulsar.client.api.Message record, Reader reader, + Type payloadType) { + return MessageBuilder.createMessage(extractAndConvertValue(record), this.headerMapper.toSpringHeaders(record)); + } + protected org.springframework.messaging.converter.MessageConverter getMessagingConverter() { return this.messagingConverter; } @@ -58,7 +66,7 @@ public void setMessagingConverter(SmartMessageConverter messagingConverter) { } protected Object extractAndConvertValue(org.apache.pulsar.client.api.Message record) { - return record.getValue(); + return record.getValue() != null ? record.getValue() : PulsarNull.INSTANCE; } } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/function/PulsarFunctionAdministrationTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/function/PulsarFunctionAdministrationTests.java index e09d8fab..20e1668c 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/function/PulsarFunctionAdministrationTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/function/PulsarFunctionAdministrationTests.java @@ -27,7 +27,6 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -46,6 +45,7 @@ import org.springframework.pulsar.core.PulsarAdministration; import org.springframework.pulsar.function.PulsarFunctionAdministration.PulsarFunctionException; import org.springframework.pulsar.function.PulsarFunctionOperations.FunctionStopPolicy; +import org.springframework.pulsar.function.PulsarFunctionOperations.FunctionType; /** * Tests for {@link PulsarFunctionAdministration}. @@ -80,10 +80,16 @@ void setupSharedMocks() throws PulsarClientException, PulsarAdminException { // create function, sink, source mock but do not add to bean factory function1 = mock(PulsarFunction.class); when(function1.functionExists(pulsarAdmin)).thenReturn(false); + when(function1.name()).thenReturn("function1"); + when(function1.type()).thenReturn(FunctionType.FUNCTION); sink1 = mock(PulsarSink.class); when(sink1.functionExists(pulsarAdmin)).thenReturn(false); + when(sink1.name()).thenReturn("sink1"); + when(sink1.type()).thenReturn(FunctionType.SINK); source1 = mock(PulsarSource.class); when(source1.functionExists(pulsarAdmin)).thenReturn(false); + when(source1.name()).thenReturn("source1"); + when(source1.type()).thenReturn(FunctionType.SOURCE); } @Nested @@ -92,7 +98,7 @@ class ProperCreateUpdateApiCalled { @ParameterizedTest @ValueSource(strings = { "myfunc.jar", "builtin://myfunc" }) void create(String archive) throws PulsarAdminException { - PulsarFunction function = setupMockFunction(archive); + var function = setupMockFunction(archive); when(function.functionExists(pulsarAdmin)).thenReturn(false); functionAdmin.createOrUpdateUserDefinedFunctions(); verify(function).create(pulsarAdmin); @@ -102,7 +108,7 @@ void create(String archive) throws PulsarAdminException { @ValueSource(strings = { "https://myfunc.jar", "file:///myfunc.jar", "function://myfunc.jar", "sink://myfunc.jar", "source://myfunc.jar" }) void createWithUrl(String archive) throws PulsarAdminException { - PulsarFunction function = setupMockFunction(archive); + var function = setupMockFunction(archive); when(function.functionExists(pulsarAdmin)).thenReturn(false); functionAdmin.createOrUpdateUserDefinedFunctions(); verify(function).createWithUrl(pulsarAdmin); @@ -111,7 +117,7 @@ void createWithUrl(String archive) throws PulsarAdminException { @ParameterizedTest @ValueSource(strings = { "myfunc.jar", "builtin://myfunc" }) void update(String archive) throws PulsarAdminException { - PulsarFunction function = setupMockFunction(archive); + var function = setupMockFunction(archive); when(function.functionExists(pulsarAdmin)).thenReturn(true); functionAdmin.createOrUpdateUserDefinedFunctions(); verify(function).update(pulsarAdmin); @@ -121,15 +127,16 @@ void update(String archive) throws PulsarAdminException { @ValueSource(strings = { "https://myfunc.jar", "file:///myfunc.jar", "function://myfunc.jar", "sink://myfunc.jar", "source://myfunc.jar" }) void updateWithUrl(String archive) throws PulsarAdminException { - PulsarFunction function = setupMockFunction(archive); + var function = setupMockFunction(archive); when(function.functionExists(pulsarAdmin)).thenReturn(true); functionAdmin.createOrUpdateUserDefinedFunctions(); verify(function).updateWithUrl(pulsarAdmin); } private PulsarFunction setupMockFunction(String archive) { - PulsarFunction function = mock(PulsarFunction.class); + var function = mock(PulsarFunction.class); when(function.name()).thenReturn("function1"); + when(function.type()).thenReturn(FunctionType.FUNCTION); when(function.archive()).thenReturn(archive); beanFactory.addBean("myFunction", function); return function; @@ -219,10 +226,10 @@ class WithFailFast { @Test void firstProcessedFunctionFails() throws PulsarAdminException { - PulsarAdminException ex = new PulsarAdminException("BOOM"); + var ex = new PulsarAdminException("BOOM"); when(function1.functionExists(pulsarAdmin)).thenThrow(ex); - PulsarFunctionException thrown = catchThrowableOfType( - () -> functionAdmin.createOrUpdateUserDefinedFunctions(), PulsarFunctionException.class); + var thrown = catchThrowableOfType(() -> functionAdmin.createOrUpdateUserDefinedFunctions(), + PulsarFunctionException.class); assertThat(thrown.getFailures()).containsExactly(entry(function1, ex)); verify(function1, never()).create(pulsarAdmin); verify(function1, never()).update(pulsarAdmin); @@ -232,10 +239,10 @@ void firstProcessedFunctionFails() throws PulsarAdminException { @Test void middleProcessedFunctionFails() throws PulsarAdminException { - PulsarAdminException ex = new PulsarAdminException("BOOM"); + var ex = new PulsarAdminException("BOOM"); when(sink1.functionExists(pulsarAdmin)).thenThrow(ex); - PulsarFunctionException thrown = catchThrowableOfType( - () -> functionAdmin.createOrUpdateUserDefinedFunctions(), PulsarFunctionException.class); + var thrown = catchThrowableOfType(() -> functionAdmin.createOrUpdateUserDefinedFunctions(), + PulsarFunctionException.class); assertThat(thrown.getFailures()).containsExactly(entry(sink1, ex)); verify(function1).create(pulsarAdmin); verify(sink1, never()).create(pulsarAdmin); @@ -246,10 +253,10 @@ void middleProcessedFunctionFails() throws PulsarAdminException { @Test void lastProcessedFunctionFails() throws PulsarAdminException { - PulsarAdminException ex = new PulsarAdminException("BOOM"); + var ex = new PulsarAdminException("BOOM"); when(source1.functionExists(pulsarAdmin)).thenThrow(ex); - PulsarFunctionException thrown = catchThrowableOfType( - () -> functionAdmin.createOrUpdateUserDefinedFunctions(), PulsarFunctionException.class); + var thrown = catchThrowableOfType(() -> functionAdmin.createOrUpdateUserDefinedFunctions(), + PulsarFunctionException.class); assertThat(thrown.getFailures()).containsExactly(entry(source1, ex)); verify(function1).create(pulsarAdmin); verify(sink1).create(pulsarAdmin); @@ -273,10 +280,10 @@ void disableFailFastOnFunctionAdmin() { @Test void firstProcessedFunctionFails() throws PulsarAdminException { - PulsarAdminException ex = new PulsarAdminException("BOOM"); + var ex = new PulsarAdminException("BOOM"); when(function1.functionExists(pulsarAdmin)).thenThrow(ex); - PulsarFunctionException thrown = catchThrowableOfType( - () -> functionAdmin.createOrUpdateUserDefinedFunctions(), PulsarFunctionException.class); + var thrown = catchThrowableOfType(() -> functionAdmin.createOrUpdateUserDefinedFunctions(), + PulsarFunctionException.class); assertThat(thrown.getFailures()).containsExactly(entry(function1, ex)); verify(function1, never()).create(pulsarAdmin); verify(function1, never()).update(pulsarAdmin); @@ -287,10 +294,10 @@ void firstProcessedFunctionFails() throws PulsarAdminException { @Test void middleProcessedFunctionFails() throws PulsarAdminException { - PulsarAdminException ex = new PulsarAdminException("BOOM"); + var ex = new PulsarAdminException("BOOM"); when(sink1.functionExists(pulsarAdmin)).thenThrow(ex); - PulsarFunctionException thrown = catchThrowableOfType( - () -> functionAdmin.createOrUpdateUserDefinedFunctions(), PulsarFunctionException.class); + var thrown = catchThrowableOfType(() -> functionAdmin.createOrUpdateUserDefinedFunctions(), + PulsarFunctionException.class); assertThat(thrown.getFailures()).containsExactly(entry(sink1, ex)); verify(function1).create(pulsarAdmin); verify(sink1, never()).create(pulsarAdmin); @@ -301,10 +308,10 @@ void middleProcessedFunctionFails() throws PulsarAdminException { @Test void lastProcessedFunctionFails() throws PulsarAdminException { - PulsarAdminException ex = new PulsarAdminException("BOOM"); + var ex = new PulsarAdminException("BOOM"); when(source1.functionExists(pulsarAdmin)).thenThrow(ex); - PulsarFunctionException thrown = catchThrowableOfType( - () -> functionAdmin.createOrUpdateUserDefinedFunctions(), PulsarFunctionException.class); + var thrown = catchThrowableOfType(() -> functionAdmin.createOrUpdateUserDefinedFunctions(), + PulsarFunctionException.class); assertThat(thrown.getFailures()).containsExactly(entry(source1, ex)); verify(function1).create(pulsarAdmin); verify(sink1).create(pulsarAdmin); @@ -321,8 +328,8 @@ void allProcessedFunctionsFail() throws PulsarAdminException { when(function1.functionExists(pulsarAdmin)).thenThrow(ex1); when(sink1.functionExists(pulsarAdmin)).thenThrow(ex2); when(source1.functionExists(pulsarAdmin)).thenThrow(ex3); - PulsarFunctionException thrown = catchThrowableOfType( - () -> functionAdmin.createOrUpdateUserDefinedFunctions(), PulsarFunctionException.class); + var thrown = catchThrowableOfType(() -> functionAdmin.createOrUpdateUserDefinedFunctions(), + PulsarFunctionException.class); assertThat(thrown.getFailures()).containsExactly(entry(function1, ex1), entry(sink1, ex2), entry(source1, ex3)); verify(function1, never()).create(pulsarAdmin); @@ -351,7 +358,7 @@ void disablePropagationOnFunctionAdmin() { void createAdminClientFails() throws Exception { beanFactory.addBean("function1", function1); when(springPulsarAdmin.createAdminClient()).thenThrow(new PulsarClientException("NOPE")); - String output = tapSystemErrAndOutNormalized(() -> functionAdmin.createOrUpdateUserDefinedFunctions()); + var output = tapSystemErrAndOutNormalized(() -> functionAdmin.createOrUpdateUserDefinedFunctions()); assertThat(output).contains("Unable to create/update functions - could not create PulsarAdmin: NOPE"); } @@ -359,7 +366,7 @@ void createAdminClientFails() throws Exception { void processedFunctionFails() throws Exception { beanFactory.addBean("function1", function1); when(function1.functionExists(pulsarAdmin)).thenThrow(new PulsarAdminException("BOOM")); - String output = tapSystemErrAndOutNormalized(() -> functionAdmin.createOrUpdateUserDefinedFunctions()); + var output = tapSystemErrAndOutNormalized(() -> functionAdmin.createOrUpdateUserDefinedFunctions()); assertThat(output).contains("Encountered 1 error(s) creating/updating functions:", "PulsarAdminException: BOOM"); } @@ -373,17 +380,17 @@ class ProperStopPolicyApiCalled { @Test void none() { - PulsarFunction function = mock(PulsarFunction.class); + var function = mockFunction("noneFunction"); when(function.stopPolicy()).thenReturn(FunctionStopPolicy.NONE); functionAdmin.getProcessedFunctions().add(function); functionAdmin.enforceStopPolicyOnUserDefinedFunctions(); verify(function).stopPolicy(); - verifyNoMoreInteractions(function); + verify(function, never()).stop(pulsarAdmin); } @Test void stop() { - PulsarFunction function = mock(PulsarFunction.class); + var function = mockFunction("stopFunction"); when(function.stopPolicy()).thenReturn(FunctionStopPolicy.STOP); functionAdmin.getProcessedFunctions().add(function); functionAdmin.enforceStopPolicyOnUserDefinedFunctions(); @@ -392,13 +399,20 @@ void stop() { @Test void delete() { - PulsarFunction function = mock(PulsarFunction.class); + var function = mockFunction("deleteFunction"); when(function.stopPolicy()).thenReturn(FunctionStopPolicy.DELETE); functionAdmin.getProcessedFunctions().add(function); functionAdmin.enforceStopPolicyOnUserDefinedFunctions(); verify(function).delete(pulsarAdmin); } + PulsarFunction mockFunction(String name) { + var function = mock(PulsarFunction.class); + when(function.name()).thenReturn(name); + when(function.type()).thenReturn(FunctionType.FUNCTION); + return function; + } + } @Nested @@ -458,10 +472,10 @@ void createAdminClientFails() throws PulsarClientException { @Test void firstProcessedFunctionFails() { - PulsarException ex = new PulsarException("BOOM"); + var ex = new PulsarException("BOOM"); doThrow(ex).when(source1).stop(pulsarAdmin); - PulsarFunctionException thrown = catchThrowableOfType( - () -> functionAdmin.enforceStopPolicyOnUserDefinedFunctions(), PulsarFunctionException.class); + var thrown = catchThrowableOfType(() -> functionAdmin.enforceStopPolicyOnUserDefinedFunctions(), + PulsarFunctionException.class); assertThat(thrown.getFailures()).containsExactly(entry(source1, ex)); verify(sink1).stop(pulsarAdmin); verify(function1).stop(pulsarAdmin); @@ -469,10 +483,10 @@ void firstProcessedFunctionFails() { @Test void middleProcessedFunctionFails() { - PulsarException ex = new PulsarException("BOOM"); + var ex = new PulsarException("BOOM"); doThrow(ex).when(sink1).stop(pulsarAdmin); - PulsarFunctionException thrown = catchThrowableOfType( - () -> functionAdmin.enforceStopPolicyOnUserDefinedFunctions(), PulsarFunctionException.class); + var thrown = catchThrowableOfType(() -> functionAdmin.enforceStopPolicyOnUserDefinedFunctions(), + PulsarFunctionException.class); assertThat(thrown.getFailures()).containsExactly(entry(sink1, ex)); verify(source1).stop(pulsarAdmin); verify(function1).stop(pulsarAdmin); @@ -480,10 +494,10 @@ void middleProcessedFunctionFails() { @Test void lastProcessedFunctionFails() { - PulsarException ex = new PulsarException("BOOM"); + var ex = new PulsarException("BOOM"); doThrow(ex).when(function1).stop(pulsarAdmin); - PulsarFunctionException thrown = catchThrowableOfType( - () -> functionAdmin.enforceStopPolicyOnUserDefinedFunctions(), PulsarFunctionException.class); + var thrown = catchThrowableOfType(() -> functionAdmin.enforceStopPolicyOnUserDefinedFunctions(), + PulsarFunctionException.class); assertThat(thrown.getFailures()).containsExactly(entry(function1, ex)); verify(source1).stop(pulsarAdmin); verify(sink1).stop(pulsarAdmin); @@ -497,8 +511,8 @@ void allProcessedFunctionsFail() { doThrow(ex1).when(source1).stop(pulsarAdmin); doThrow(ex2).when(sink1).stop(pulsarAdmin); doThrow(ex3).when(function1).stop(pulsarAdmin); - PulsarFunctionException thrown = catchThrowableOfType( - () -> functionAdmin.enforceStopPolicyOnUserDefinedFunctions(), PulsarFunctionException.class); + var thrown = catchThrowableOfType(() -> functionAdmin.enforceStopPolicyOnUserDefinedFunctions(), + PulsarFunctionException.class); assertThat(thrown.getFailures()).containsExactly(entry(source1, ex1), entry(sink1, ex2), entry(function1, ex3)); } @@ -518,7 +532,7 @@ void disableStopPropagationOnFunctionAdmin() { void createAdminClientFails() throws Exception { functionAdmin.getProcessedFunctions().add(function1); when(springPulsarAdmin.createAdminClient()).thenThrow(new PulsarClientException("NOPE")); - String output = tapSystemErrAndOutNormalized( + var output = tapSystemErrAndOutNormalized( () -> functionAdmin.enforceStopPolicyOnUserDefinedFunctions()); assertThat(output) .contains("Unable to enforce stop policy on functions - could not create PulsarAdmin: NOPE"); @@ -528,7 +542,7 @@ void createAdminClientFails() throws Exception { void processedFunctionFails() throws Exception { functionAdmin.getProcessedFunctions().add(function1); doThrow(new PulsarException("BOOM")).when(function1).stop(pulsarAdmin); - String output = tapSystemErrAndOutNormalized( + var output = tapSystemErrAndOutNormalized( () -> functionAdmin.enforceStopPolicyOnUserDefinedFunctions()); assertThat(output).contains("Encountered 1 error(s) enforcing stop policy on functions:", "PulsarException: BOOM"); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTestsBase.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTestsBase.java index 7cdd0692..4310c381 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTestsBase.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTestsBase.java @@ -24,6 +24,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.pulsar.annotation.EnablePulsar; +import org.springframework.pulsar.annotation.PulsarListener; import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; import org.springframework.pulsar.config.PulsarListenerContainerFactory; import org.springframework.pulsar.core.ConsumerBuilderCustomizer; @@ -40,6 +41,8 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; /** + * Provides base support for {@link PulsarListener @PulsarListener} tests. + * * @author Soby Chacko * @author Alexander Preuß * @author Chris Bono diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTombstoneTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTombstoneTests.java new file mode 100644 index 00000000..1f18afda --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTombstoneTests.java @@ -0,0 +1,356 @@ +/* + * Copyright 2022-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.listener; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaType; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.pulsar.annotation.PulsarListener; +import org.springframework.pulsar.core.DefaultPulsarProducerFactory; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.listener.PulsarListenerTombstoneTests.BatchComplexPayload.BatchComplexPayloadConfig; +import org.springframework.pulsar.listener.PulsarListenerTombstoneTests.BatchPrimitivePayload.BatchPrimitivePayloadConfig; +import org.springframework.pulsar.listener.PulsarListenerTombstoneTests.PulsarMessagePayload.PulsarMessagePayloadConfig; +import org.springframework.pulsar.listener.PulsarListenerTombstoneTests.SingleComplexPayload.SingleComplexPayloadConfig; +import org.springframework.pulsar.listener.PulsarListenerTombstoneTests.SinglePrimitivePayload.SinglePrimitivePayloadConfig; +import org.springframework.pulsar.listener.PulsarListenerTombstoneTests.SpringMessagePayload.SpringMessagePayloadConfig; +import org.springframework.pulsar.support.PulsarHeaders; +import org.springframework.pulsar.support.PulsarNull; +import org.springframework.test.context.ContextConfiguration; + +/** + * Tests consuming {@link PulsarNull tombstone} records in + * {@link PulsarListener @PulsarListener}. + * + * @author Chris Bono + */ +class PulsarListenerTombstoneTests extends PulsarListenerTestsBase { + + static void sendTestMessages(PulsarTemplate pulsarTemplate, String topic, Schema schema, + Function payloadFactory) throws PulsarClientException { + pulsarTemplate.newMessage(payloadFactory.apply("foo")) + .withTopic(topic) + .withMessageCustomizer((mb) -> mb.key("key:foo")) + .send(); + pulsarTemplate.newMessage(null) + .withTopic(topic) + .withSchema(schema) + .withMessageCustomizer((mb) -> mb.key("key:null")) + .send(); + pulsarTemplate.newMessage(payloadFactory.apply("bar")) + .withTopic(topic) + .withMessageCustomizer((mb) -> mb.key("key:bar")) + .send(); + } + + static void assertMessagesReceivedWithHeaders(List> receivedMessages, + Function payloadFactory) { + assertThat(receivedMessages).containsExactly(new ReceivedMessage<>(payloadFactory.apply("foo"), "key:foo"), + new ReceivedMessage<>(null, "key:null"), new ReceivedMessage<>(payloadFactory.apply("bar"), "key:bar")); + } + + static void assertMessagesReceivedWithoutHeaders(List> receivedMessages, + Function payloadFactory) { + assertThat(receivedMessages).containsExactly(new ReceivedMessage<>(payloadFactory.apply("foo"), null), + new ReceivedMessage<>(null, null), new ReceivedMessage<>(payloadFactory.apply("bar"), null)); + } + + @Nested + @ContextConfiguration(classes = PulsarMessagePayloadConfig.class) + class PulsarMessagePayload { + + private static final String TOPIC = "pltt-pulsar-msg-topic"; + + static CountDownLatch latchWithHeaders = new CountDownLatch(3); + static CountDownLatch latchWithoutHeaders = new CountDownLatch(3); + static List> receivedMessagesWithHeaders = new ArrayList<>(); + static List> receivedMessagesWithoutHeaders = new ArrayList<>(); + + @Test + void shouldReceiveMessagesWithTombstone() throws Exception { + sendTestMessages(pulsarTemplate, TOPIC, Schema.STRING, Function.identity()); + assertThat(latchWithHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(latchWithoutHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertMessagesReceivedWithHeaders(receivedMessagesWithHeaders, Function.identity()); + assertMessagesReceivedWithoutHeaders(receivedMessagesWithoutHeaders, Function.identity()); + } + + @Configuration(proxyBeanMethods = false) + static class PulsarMessagePayloadConfig { + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-headers", schemaType = SchemaType.STRING, + properties = { "subscriptionInitialPosition=Earliest" }) + public void listenWithHeaders(org.apache.pulsar.client.api.Message msg, + @Header(PulsarHeaders.KEY) String key) { + receivedMessagesWithHeaders.add(new ReceivedMessage<>(msg.getValue(), key)); + latchWithHeaders.countDown(); + } + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-no-headers", + schemaType = SchemaType.STRING, properties = { "subscriptionInitialPosition=Earliest" }) + public void listenWithoutHeaders(org.apache.pulsar.client.api.Message msg) { + receivedMessagesWithoutHeaders.add(new ReceivedMessage<>(msg.getValue(), null)); + latchWithoutHeaders.countDown(); + } + + } + + } + + @Nested + @ContextConfiguration(classes = SpringMessagePayloadConfig.class) + class SpringMessagePayload { + + private static final String TOPIC = "pltt-spring-msg-topic"; + + static CountDownLatch latchWithHeaders = new CountDownLatch(3); + static CountDownLatch latchWithoutHeaders = new CountDownLatch(3); + static List> receivedMessagesWithHeaders = new ArrayList<>(); + static List> receivedMessagesWithoutHeaders = new ArrayList<>(); + + @Test + void shouldReceiveMessagesWithTombstone() throws Exception { + sendTestMessages(pulsarTemplate, TOPIC, Schema.STRING, Function.identity()); + assertThat(latchWithHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(latchWithoutHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertMessagesReceivedWithHeaders(receivedMessagesWithHeaders, Function.identity()); + assertMessagesReceivedWithoutHeaders(receivedMessagesWithoutHeaders, Function.identity()); + } + + @Configuration(proxyBeanMethods = false) + static class SpringMessagePayloadConfig { + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-headers", schemaType = SchemaType.STRING, + properties = { "subscriptionInitialPosition=Earliest" }) + public void listenWithHeaders(Message msg, @Header(PulsarHeaders.KEY) String key) { + var payload = (msg.getPayload() != PulsarNull.INSTANCE) ? msg.getPayload().toString() : null; + receivedMessagesWithHeaders.add(new ReceivedMessage<>(payload, key)); + latchWithHeaders.countDown(); + } + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-no-headers", + schemaType = SchemaType.STRING, properties = { "subscriptionInitialPosition=Earliest" }) + public void listenWithoutHeaders(Message msg) { + var payload = (msg.getPayload() != PulsarNull.INSTANCE) ? msg.getPayload().toString() : null; + receivedMessagesWithoutHeaders.add(new ReceivedMessage<>(payload, null)); + latchWithoutHeaders.countDown(); + } + + } + + } + + @Nested + @ContextConfiguration(classes = SinglePrimitivePayloadConfig.class) + class SinglePrimitivePayload { + + private static final String TOPIC = "pltt-single-primitive-topic"; + + static CountDownLatch latchWithHeaders = new CountDownLatch(3); + static CountDownLatch latchWithoutHeaders = new CountDownLatch(3); + static List> receivedMessagesWithHeaders = new ArrayList<>(); + static List> receivedMessagesWithoutHeaders = new ArrayList<>(); + + @Test + void shouldReceiveMessagesWithTombstone() throws Exception { + sendTestMessages(pulsarTemplate, TOPIC, Schema.STRING, Function.identity()); + assertThat(latchWithHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(latchWithoutHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertMessagesReceivedWithHeaders(receivedMessagesWithHeaders, Function.identity()); + assertMessagesReceivedWithoutHeaders(receivedMessagesWithoutHeaders, Function.identity()); + } + + @Configuration(proxyBeanMethods = false) + static class SinglePrimitivePayloadConfig { + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-headers", schemaType = SchemaType.STRING, + properties = { "subscriptionInitialPosition=Earliest" }) + public void listenWithHeaders(@Payload(required = false) String msg, + @Header(PulsarHeaders.KEY) String key) { + receivedMessagesWithHeaders.add(new ReceivedMessage<>(msg, key)); + latchWithHeaders.countDown(); + } + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-no-headers", + schemaType = SchemaType.STRING, properties = { "subscriptionInitialPosition=Earliest" }) + public void listenWithoutHeaders(@Payload(required = false) String msg) { + receivedMessagesWithoutHeaders.add(new ReceivedMessage<>(msg, null)); + latchWithoutHeaders.countDown(); + } + + } + + } + + @Nested + @ContextConfiguration(classes = BatchPrimitivePayloadConfig.class) + class BatchPrimitivePayload { + + private static final String TOPIC = "pltt-multi-primitive-topic"; + + static CountDownLatch latchWithHeaders = new CountDownLatch(3); + static CountDownLatch latchWithoutHeaders = new CountDownLatch(3); + static List> receivedMessagesWithHeaders = new ArrayList<>(); + static List> receivedMessagesWithoutHeaders = new ArrayList<>(); + + @Test + void shouldReceiveMessagesWithTombstone() throws Exception { + sendTestMessages(pulsarTemplate, TOPIC, Schema.STRING, Function.identity()); + assertThat(latchWithHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(latchWithoutHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertMessagesReceivedWithHeaders(receivedMessagesWithHeaders, Function.identity()); + assertMessagesReceivedWithoutHeaders(receivedMessagesWithoutHeaders, Function.identity()); + } + + @Configuration(proxyBeanMethods = false) + static class BatchPrimitivePayloadConfig { + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-headers", schemaType = SchemaType.STRING, + batch = true, properties = { "subscriptionInitialPosition=Earliest" }) + public void listenWithHeaders(List msgs, @Header(PulsarHeaders.KEY) List keys) { + for (int i = 0; i < msgs.size(); i++) { + receivedMessagesWithHeaders.add(new ReceivedMessage<>(msgs.get(i), keys.get(i))); + latchWithHeaders.countDown(); + } + } + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-no-headers", + schemaType = SchemaType.STRING, batch = true, + properties = { "subscriptionInitialPosition=Earliest" }) + public void listenMultipleNoHeaders(List msgs) { + for (int i = 0; i < msgs.size(); i++) { + receivedMessagesWithoutHeaders.add(new ReceivedMessage<>(msgs.get(i), null)); + latchWithoutHeaders.countDown(); + } + } + + } + + } + + @Nested + @ContextConfiguration(classes = SingleComplexPayloadConfig.class) + class SingleComplexPayload { + + private static final String TOPIC = "pltt-single-complex-topic"; + + static CountDownLatch latchWithHeaders = new CountDownLatch(3); + static CountDownLatch latchWithoutHeaders = new CountDownLatch(3); + static List> receivedMessagesWithHeaders = new ArrayList<>(); + static List> receivedMessagesWithoutHeaders = new ArrayList<>(); + + @Test + void shouldReceiveMessagesWithTombstone() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var fooPulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); + sendTestMessages(fooPulsarTemplate, TOPIC, Schema.JSON(Foo.class), Foo::new); + assertThat(latchWithHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(latchWithoutHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertMessagesReceivedWithHeaders(receivedMessagesWithHeaders, Foo::new); + assertMessagesReceivedWithoutHeaders(receivedMessagesWithoutHeaders, Foo::new); + } + + @Configuration(proxyBeanMethods = false) + static class SingleComplexPayloadConfig { + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-headers", schemaType = SchemaType.JSON, + properties = { "subscriptionInitialPosition=Earliest" }) + public void listenWithHeaders(@Payload(required = false) Foo msg, @Header(PulsarHeaders.KEY) String key) { + receivedMessagesWithHeaders.add(new ReceivedMessage<>(msg, key)); + latchWithHeaders.countDown(); + } + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-no-headers", schemaType = SchemaType.JSON, + properties = { "subscriptionInitialPosition=Earliest" }) + public void listenWithoutHeaders(@Payload(required = false) Foo msg) { + receivedMessagesWithoutHeaders.add(new ReceivedMessage<>(msg, null)); + latchWithoutHeaders.countDown(); + } + + } + + } + + @Nested + @ContextConfiguration(classes = BatchComplexPayloadConfig.class) + class BatchComplexPayload { + + private static final String TOPIC = "pltt-multi-complex-topic"; + + static CountDownLatch latchWithHeaders = new CountDownLatch(3); + static CountDownLatch latchWithoutHeaders = new CountDownLatch(3); + static List> receivedMessagesWithHeaders = new ArrayList<>(); + static List> receivedMessagesWithoutHeaders = new ArrayList<>(); + + @Test + void shouldReceiveMessagesWithTombstone() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var fooPulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); + sendTestMessages(fooPulsarTemplate, TOPIC, Schema.JSON(Foo.class), Foo::new); + assertThat(latchWithHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(latchWithoutHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertMessagesReceivedWithHeaders(receivedMessagesWithHeaders, Foo::new); + assertMessagesReceivedWithoutHeaders(receivedMessagesWithoutHeaders, Foo::new); + } + + @Configuration(proxyBeanMethods = false) + static class BatchComplexPayloadConfig { + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-headers", schemaType = SchemaType.JSON, + batch = true, properties = { "subscriptionInitialPosition=Earliest" }) + public void listenWithHeaders(List msgs, @Header(PulsarHeaders.KEY) List keys) { + for (int i = 0; i < msgs.size(); i++) { + receivedMessagesWithHeaders.add(new ReceivedMessage<>(msgs.get(i), keys.get(i))); + latchWithHeaders.countDown(); + } + } + + @PulsarListener(topics = TOPIC, subscriptionName = TOPIC + "-sub-no-headers", schemaType = SchemaType.JSON, + batch = true, properties = { "subscriptionInitialPosition=Earliest" }) + public void listenMultipleNoHeaders(List msgs) { + for (int i = 0; i < msgs.size(); i++) { + receivedMessagesWithoutHeaders.add(new ReceivedMessage<>(msgs.get(i), null)); + latchWithoutHeaders.countDown(); + } + } + + } + + } + + record Foo(String value) { + } + + record ReceivedMessage(T payload, String keyHeader) { + } + +} diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderBasicTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderBasicTests.java new file mode 100644 index 00000000..2935fe1a --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderBasicTests.java @@ -0,0 +1,204 @@ +/* + * Copyright 2022-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.reader; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaType; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.pulsar.annotation.PulsarReader; +import org.springframework.pulsar.core.DefaultPulsarProducerFactory; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.reader.PulsarReaderBasicTests.PulsarMessagePayload.PulsarMessagePayloadConfig; +import org.springframework.pulsar.reader.PulsarReaderBasicTests.SingleComplexPayload.SingleComplexPayloadConfig; +import org.springframework.pulsar.reader.PulsarReaderBasicTests.SinglePrimitivePayload.SinglePrimitivePayloadConfig; +import org.springframework.pulsar.reader.PulsarReaderBasicTests.SpringMessagePayload.SpringMessagePayloadConfig; +import org.springframework.pulsar.support.PulsarNull; +import org.springframework.test.context.ContextConfiguration; + +/** + * Tests consuming records (including {@link PulsarNull tombstones}) in + * {@link PulsarReader @PulsarReader}. + * + * @author Chris Bono + */ +class PulsarReaderBasicTests extends PulsarReaderTestsBase { + + static void sendTestMessages(PulsarTemplate pulsarTemplate, String topic, Schema schema, + Function payloadFactory) throws PulsarClientException { + pulsarTemplate.newMessage(payloadFactory.apply("foo")) + .withTopic(topic) + .withMessageCustomizer((mb) -> mb.key("key:foo")) + .send(); + pulsarTemplate.newMessage(null) + .withTopic(topic) + .withSchema(schema) + .withMessageCustomizer((mb) -> mb.key("key:null")) + .send(); + pulsarTemplate.newMessage(payloadFactory.apply("bar")) + .withTopic(topic) + .withMessageCustomizer((mb) -> mb.key("key:bar")) + .send(); + } + + static void assertMessagesReceivedWithoutHeaders(List> receivedMessages, + Function payloadFactory) { + assertThat(receivedMessages).containsExactly(new ReceivedMessage<>(payloadFactory.apply("foo"), null), + new ReceivedMessage<>(null, null), new ReceivedMessage<>(payloadFactory.apply("bar"), null)); + } + + @Nested + @ContextConfiguration(classes = PulsarMessagePayloadConfig.class) + class PulsarMessagePayload { + + private static final String TOPIC = "prbt-pulsar-msg-topic"; + + static CountDownLatch latchWithoutHeaders = new CountDownLatch(3); + static List> receivedMessagesWithoutHeaders = new ArrayList<>(); + + @Test + void shouldReceiveMessagesWithTombstone() throws Exception { + sendTestMessages(pulsarTemplate, TOPIC, Schema.STRING, Function.identity()); + assertThat(latchWithoutHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertMessagesReceivedWithoutHeaders(receivedMessagesWithoutHeaders, Function.identity()); + } + + @Configuration(proxyBeanMethods = false) + static class PulsarMessagePayloadConfig { + + @PulsarReader(topics = TOPIC, schemaType = SchemaType.STRING, startMessageId = "earliest") + public void listenWithoutHeaders(org.apache.pulsar.client.api.Message msg) { + receivedMessagesWithoutHeaders.add(new ReceivedMessage<>(msg.getValue(), null)); + latchWithoutHeaders.countDown(); + } + + } + + } + + @Nested + @ContextConfiguration(classes = SpringMessagePayloadConfig.class) + class SpringMessagePayload { + + private static final String TOPIC = "prbt-spring-msg-topic"; + + static CountDownLatch latchWithoutHeaders = new CountDownLatch(3); + static List> receivedMessagesWithoutHeaders = new ArrayList<>(); + + @Test + void shouldReceiveMessagesWithTombstone() throws Exception { + sendTestMessages(pulsarTemplate, TOPIC, Schema.STRING, Function.identity()); + assertThat(latchWithoutHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertMessagesReceivedWithoutHeaders(receivedMessagesWithoutHeaders, Function.identity()); + + } + + @Configuration(proxyBeanMethods = false) + static class SpringMessagePayloadConfig { + + @PulsarReader(topics = TOPIC, schemaType = SchemaType.STRING, startMessageId = "earliest") + public void listenWithoutHeaders(Message msg) { + var payload = (msg.getPayload() != PulsarNull.INSTANCE) ? msg.getPayload().toString() : null; + assertThat(msg.getHeaders()).isNotEmpty(); + receivedMessagesWithoutHeaders.add(new ReceivedMessage<>(payload, null)); + latchWithoutHeaders.countDown(); + } + + } + + } + + @Nested + @ContextConfiguration(classes = SinglePrimitivePayloadConfig.class) + class SinglePrimitivePayload { + + private static final String TOPIC = "prbt-single-primitive-topic"; + + static CountDownLatch latchWithoutHeaders = new CountDownLatch(3); + static List> receivedMessagesWithoutHeaders = new ArrayList<>(); + + @Test + void shouldReceiveMessagesWithTombstone() throws Exception { + sendTestMessages(pulsarTemplate, TOPIC, Schema.STRING, Function.identity()); + assertThat(latchWithoutHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertMessagesReceivedWithoutHeaders(receivedMessagesWithoutHeaders, Function.identity()); + } + + @Configuration(proxyBeanMethods = false) + static class SinglePrimitivePayloadConfig { + + @PulsarReader(topics = TOPIC, schemaType = SchemaType.STRING, startMessageId = "earliest") + public void listenWithoutHeaders(@Payload(required = false) String msg) { + receivedMessagesWithoutHeaders.add(new ReceivedMessage<>(msg, null)); + latchWithoutHeaders.countDown(); + } + + } + + } + + @Nested + @ContextConfiguration(classes = SingleComplexPayloadConfig.class) + class SingleComplexPayload { + + private static final String TOPIC = "prbt-single-complex-topic"; + + static CountDownLatch latchWithoutHeaders = new CountDownLatch(3); + static List> receivedMessagesWithoutHeaders = new ArrayList<>(); + + @Test + void shouldReceiveMessagesWithTombstone() throws Exception { + var pulsarProducerFactory = new DefaultPulsarProducerFactory(pulsarClient); + var fooPulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); + sendTestMessages(fooPulsarTemplate, TOPIC, Schema.JSON(Foo.class), Foo::new); + assertThat(latchWithoutHeaders.await(5, TimeUnit.SECONDS)).isTrue(); + assertMessagesReceivedWithoutHeaders(receivedMessagesWithoutHeaders, Foo::new); + } + + @Configuration(proxyBeanMethods = false) + static class SingleComplexPayloadConfig { + + @PulsarReader(topics = TOPIC, schemaType = SchemaType.JSON, startMessageId = "earliest") + public void listenWithoutHeaders(@Payload(required = false) Foo msg) { + receivedMessagesWithoutHeaders.add(new ReceivedMessage<>(msg, null)); + latchWithoutHeaders.countDown(); + } + + } + + } + + record Foo(String value) { + } + + record ReceivedMessage(T payload, String keyHeader) { + } + +} diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderStartMessageIdTests.java similarity index 98% rename from spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderTests.java rename to spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderStartMessageIdTests.java index 78a69b80..4150cf9a 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderStartMessageIdTests.java @@ -43,7 +43,7 @@ * @author Soby Chacko * @author Chris Bono */ -public class PulsarReaderTests extends PulsarReaderTestsBase { +public class PulsarReaderStartMessageIdTests extends PulsarReaderTestsBase { @Nested @ContextConfiguration(classes = StartMessageIdEarliest.PulsarReaderStartMessageIdEarliest.class) diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderTestsBase.java b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderTestsBase.java index 3cd32efe..085beafe 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderTestsBase.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderTestsBase.java @@ -37,7 +37,7 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; /** - * Tests for {@link PulsarReader}. + * Provides base support for {@link PulsarReader} tests. * * @author Soby Chacko * @author Chris Bono diff --git a/spring-pulsar/src/test/resources/logback-test.xml b/spring-pulsar/src/test/resources/logback-test.xml index 554cd1bd..8a2d2bbe 100644 --- a/spring-pulsar/src/test/resources/logback-test.xml +++ b/spring-pulsar/src/test/resources/logback-test.xml @@ -4,7 +4,7 @@ %d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n - +