Skip to content

Commit

Permalink
Add tombstone support to listeners (#507)
Browse files Browse the repository at this point in the history
* Add tombstone support for @PulsarListener
* Add tombstone support for @PulsarReader
* Add tombstone support for @ReactivePulsarListener

Resolves #506
  • Loading branch information
onobc authored Dec 10, 2023
1 parent d4183fe commit 77462c2
Show file tree
Hide file tree
Showing 23 changed files with 1,377 additions and 161 deletions.
2 changes: 2 additions & 0 deletions spring-pulsar-docs/src/main/antora/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
** xref:intro/getting-help.adoc[Getting Help]
* xref:reference/reference.adoc[]
** xref:reference/pulsar.adoc[]
*** xref:reference/tombstones.adoc[]
** xref:reference/reactive-pulsar.adoc[]
*** xref:reference/tombstones-reactive.adoc[]
** xref:reference/pulsar-admin.adoc[]
** xref:reference/pulsar-function.adoc[]
** xref:reference/observability.adoc[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ One advantage of the reactive concurrency model is that it can be used with `Exc
The Pulsar message metadata can be consumed as Spring message headers.
The list of available headers can be found in {github}/blob/main/spring-pulsar/src/main/java/org/springframework/pulsar/support/PulsarHeaders.java[PulsarHeaders.java].

[[reactive-pulsar-headers.single]]
==== Accessing In OneByOne Listener
The following example shows how you can access Pulsar Headers when using a one-by-one message listener:

Expand All @@ -359,6 +360,7 @@ When doing so, the user can directly call the corresponding methods on the Pulsa
However, as a convenience, you can also retrieve it by using the `Header` annotation.
Note that you can also use the Spring messaging `Message` envelope to carry the payload and then retrieve the Pulsar headers by using `@Header`.

[[reactive-pulsar-headers.streaming]]
==== Accessing In Streaming Listener
When using a streaming message listener the header support is limited.
Only when the `Flux` contains Spring `org.springframework.messaging.Message` elements will the headers be populated.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
[[tombstones-reactive]]
= Null Payloads and Log Compaction of 'Tombstone' Records

When using log compaction, you can send and receive messages with `null` payloads to identify the deletion of a key.
You can also receive `null` values for other reasons, such as a deserializer that might return `null` when it cannot deserialize a value.

[[tombstones-reactive.produce]]
== Producing Null Payloads
You can send a `null` value with the `ReactivePulsarTemplate` by passing a `null` message parameter value to one of the `send` methods, for example:
[source, java]
----
reactiveTemplate
.send(null, Schema.STRING)
.subscribe();
----
NOTE: When sending null values you must specify the schema type as the system can not determine the type of the message from a `null` payload.

[[tombstones-reactive.consume]]
== Consuming Null Payloads
For `@ReactivePularListener`, the `null` payload is passed into the listener method based on the type of its message parameter as follows:
|===
| Parameter type | Passed-in value

| primitive
| `null`

| user-defined
| `null`

| `org.apache.pulsar.client.api.Message<T>`
| non-null Pulsar message whose `getValue()` returns `null`

| `org.springframework.messaging.Message<T>`
| non-null Spring message whose `getPayload()` returns `PulsarNull`

| `Flux<org.apache.pulsar.client.api.Message<T>>`
| non-null flux whose entries are non-null Pulsar messages whose `getValue()` returns `null`

| `Flux<org.apache.pulsar.client.api.Messages<T>>`
| non-null flux whose entries are non-null Spring messages whose `getPayload()` returns `PulsarNull`

|===

IMPORTANT: When the passed-in value is `null` (ie. single record listeners with primitive or user-defined types) you must use the `@Payload` parameter annotation with `required = false`.

IMPORTANT: When using the Spring `org.springframework.messaging.Message` for your listener payload type, its generic type information must be wide enough to accept `Message<PulsarNull>` (eg. `Message`, `Message<?>`, or `Message<Object>`).
This is due to the fact that the Spring Message does not allow null values for its payload and instead uses the `PulsarNull` placeholder.

If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was +++"+++`deleted`+++"+++.
The following example shows such a configuration:

[source, java]
----
@ReactivePulsarListener(
topics = "my-topic",
subscriptionName = "my-topic-sub",
schemaType = SchemaType.STRING)
Mono<Void> myListener(
@Payload(required = false) String msg,
@Header(PulsarHeaders.KEY) String key) {
...
}
----

NOTE: When using a streaming message listener (`Flux`) the xref:reference/reactive-pulsar.adoc#reactive-pulsar-headers.streaming[header support is limited], so it less useful in the log compaction scenario.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
[[tombstones]]
= Null Payloads and Log Compaction of 'Tombstone' Records

When using log compaction, you can send and receive messages with `null` payloads to identify the deletion of a key.
You can also receive `null` values for other reasons, such as a deserializer that might return `null` when it cannot deserialize a value.

[[tombstones.produce]]
== Producing Null Payloads
To send a `null` payload by using the `PulsarTemplate`, you can use the fluent API and pass null into the value argument of the `newMessage()` method, for example:
[source, java]
----
pulsarTemplate
.newMessage(null)
.withTopic("my-topic")
.withSchema(Schema.STRING)
.withMessageCustomizer((mb) -> mb.key("key:1234"))
.send();
----
NOTE: When sending null values you must specify the schema type as the system can not determine the type of the message from a `null` payload.

[[tombstones.consume]]
== Consuming Null Payloads
For `@PulsarListener` and `@PulsarReader`, the `null` payload is passed into the listener method based on the type of its message parameter as follows:
|===
| Parameter type | Passed-in value

| primitive
| `null`

| user-defined
| `null`

| `org.apache.pulsar.client.api.Message<T>`
| non-null Pulsar message whose `getValue()` returns `null`

| `org.springframework.messaging.Message<T>`
| non-null Spring message whose `getPayload()` returns `PulsarNull`

| `List<X>`
| non-null list whose entries (`X`) are one of the above types and act accordingly (ie. primitive entries are `null` etc..)

| `org.apache.pulsar.client.api.Messages<T>`
| non-null container of non-null Pulsar messages whose `getValue()` returns `null`

|===

IMPORTANT: When the passed-in value is `null` (ie. single record listeners with primitive or user-defined types) you must use the `@Payload` parameter annotation with `required = false`.

IMPORTANT: When using the Spring `org.springframework.messaging.Message` for your listener payload type, its generic type information must be wide enough to accept `Message<PulsarNull>` (eg. `Message`, `Message<?>`, or `Message<Object>`).
This is due to the fact that the Spring Message does not allow null values for its payload and instead uses the `PulsarNull` placeholder.

If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was +++"+++`deleted`+++"+++.
The following example shows such a configuration:

[source, java]
----
@PulsarListener(
topics = "my-topic",
subscriptionName = "my-topic-sub",
schemaType = SchemaType.STRING)
void myListener(
@Payload(required = false) String msg,
@Header(PulsarHeaders.KEY) String key) {
...
}
----

NOTE: The `@PulsarReader` does not yet support `@Header` arguments, so it is less useful in the log compaction scenario.
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ private ResolvableType resolvableType(MethodParameter methodParameter) {
if (rawClass != null && isContainerType(rawClass)) {
resolvableType = resolvableType.getGeneric(0);
}
if (Message.class.isAssignableFrom(resolvableType.getRawClass())
|| org.springframework.messaging.Message.class.isAssignableFrom(resolvableType.getRawClass())) {
if (resolvableType.getRawClass() != null && (Message.class.isAssignableFrom(resolvableType.getRawClass())
|| org.springframework.messaging.Message.class.isAssignableFrom(resolvableType.getRawClass()))) {
resolvableType = resolvableType.getGeneric(0);
}
return resolvableType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
Expand All @@ -44,28 +42,22 @@
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
import org.apache.pulsar.reactive.client.api.MessageResult;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.pulsar.annotation.EnablePulsar;
import org.springframework.pulsar.core.DefaultPulsarClientFactory;
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.PulsarAdministration;
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.PulsarTopic;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory;
Expand All @@ -74,18 +66,16 @@
import org.springframework.pulsar.reactive.config.annotation.EnableReactivePulsar;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer;
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory;
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.BasicListenersTestCases.BasicListenersTestCasesConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersTest.PulsarListenerWithHeadersConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SchemaCustomMappingsTestCases.SchemaCustomMappingsTestConfig.User2;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.StreamingListenerTestCases.StreamingListenerTestCasesConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithDefaultType.WithDefaultTypeConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithSpecificTypes.WithSpecificTypesConfig;
import org.springframework.pulsar.support.PulsarHeaders;
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.ObjectUtils;

Expand All @@ -98,77 +88,11 @@
* @author Christophe Bornet
* @author Chris Bono
*/
@SpringJUnitConfig
@DirtiesContext
public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {

@Autowired
PulsarTemplate<String> pulsarTemplate;

@Autowired
private PulsarClient pulsarClient;

@Configuration(proxyBeanMethods = false)
@EnableReactivePulsar
public static class TopLevelConfig {

@Bean
public PulsarProducerFactory<String> pulsarProducerFactory(PulsarClient pulsarClient) {
return new DefaultPulsarProducerFactory<>(pulsarClient);
}

@Bean
public PulsarClient pulsarClient() throws PulsarClientException {
return new DefaultPulsarClientFactory(PulsarTestContainerSupport.getPulsarBrokerUrl()).createClient();
}

@Bean
public ReactivePulsarClient pulsarReactivePulsarClient(PulsarClient pulsarClient) {
return AdaptedReactivePulsarClientFactory.create(pulsarClient);
}

@Bean
public PulsarTemplate<String> pulsarTemplate(PulsarProducerFactory<String> pulsarProducerFactory) {
return new PulsarTemplate<>(pulsarProducerFactory);
}

@SuppressWarnings("unchecked")
@Bean
public ConsumerTrackingReactivePulsarConsumerFactory<String> pulsarConsumerFactory(
ReactivePulsarClient pulsarClient,
ObjectProvider<ReactiveMessageConsumerBuilderCustomizer<String>> defaultConsumerCustomizersProvider) {
DefaultReactivePulsarConsumerFactory<String> consumerFactory = new DefaultReactivePulsarConsumerFactory<>(
pulsarClient, defaultConsumerCustomizersProvider.orderedStream().toList());
return new ConsumerTrackingReactivePulsarConsumerFactory<>(consumerFactory);
}

@Bean
ReactivePulsarListenerContainerFactory<String> reactivePulsarListenerContainerFactory(
ReactivePulsarConsumerFactory<String> pulsarConsumerFactory) {
return new DefaultReactivePulsarListenerContainerFactory<>(pulsarConsumerFactory,
new ReactivePulsarContainerProperties<>());
}

@Bean
PulsarAdministration pulsarAdministration() {
return new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl());
}

@Bean
PulsarTopic partitionedTopic() {
return PulsarTopic.builder("persistent://public/default/concurrency-on-pl").numberOfPartitions(3).build();
}

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<?> subscriptionInitialPositionEarliest() {
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}

}
class ReactivePulsarListenerTests extends ReactivePulsarListenerTestsBase {

@Nested
@ContextConfiguration(classes = PulsarListenerBasicTestCases.TestPulsarListenersForBasicScenario.class)
class PulsarListenerBasicTestCases {
@ContextConfiguration(classes = BasicListenersTestCasesConfig.class)
class BasicListenersTestCases {

static CountDownLatch latch1 = new CountDownLatch(1);
static CountDownLatch latch2 = new CountDownLatch(1);
Expand Down Expand Up @@ -212,7 +136,7 @@ void testPulsarListenerWithTopicsPattern() throws Exception {

@EnableReactivePulsar
@Configuration
static class TestPulsarListenersForBasicScenario {
static class BasicListenersTestCasesConfig {

@ReactivePulsarListener(id = "id-1", topics = "topic-1", subscriptionName = "subscription-1",
consumerCustomizer = "listen1Customizer")
Expand Down Expand Up @@ -256,8 +180,8 @@ ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> listen3Customizer
}

@Nested
@ContextConfiguration(classes = PulsarListenerStreamingTestCases.TestPulsarListenersForStreaming.class)
class PulsarListenerStreamingTestCases {
@ContextConfiguration(classes = StreamingListenerTestCasesConfig.class)
class StreamingListenerTestCases {

static CountDownLatch latch1 = new CountDownLatch(10);
static CountDownLatch latch2 = new CountDownLatch(10);
Expand All @@ -280,7 +204,7 @@ void testPulsarListenerStreamingSpringMessage() throws Exception {

@EnableReactivePulsar
@Configuration
static class TestPulsarListenersForStreaming {
static class StreamingListenerTestCasesConfig {

@ReactivePulsarListener(topics = "streaming-1", stream = true,
consumerCustomizer = "subscriptionInitialPositionEarliest")
Expand Down
Loading

0 comments on commit 77462c2

Please sign in to comment.