Skip to content

Commit

Permalink
feat: Allow empty payloads in Kafka, in order to support compaction t…
Browse files Browse the repository at this point in the history
…ombstones (cloudevents#224)

Signed-off-by: Magne Helleborg <[email protected]>
  • Loading branch information
mhelleborg authored Aug 30, 2022
1 parent d9e10ea commit 81c82e0
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 2 deletions.
2 changes: 0 additions & 2 deletions src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ private static void InitPartitioningKey(Message<string?, byte[]> message, CloudE
Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent));
Validation.CheckNotNull(formatter, nameof(formatter));

// TODO: Is this appropriate? Why can't we transport a CloudEvent without data in Kafka?
Validation.CheckArgument(cloudEvent.Data is object, nameof(cloudEvent), "Only CloudEvents with data can be converted to Kafka messages");
var headers = MapHeaders(cloudEvent);
string? key = (string?) cloudEvent[Partitioning.PartitionKeyAttribute];
byte[] value;
Expand Down
51 changes: 51 additions & 0 deletions test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,57 @@ public void KafkaBinaryMessageTest()
Assert.Equal("value", (string?)receivedCloudEvent["comexampleextension1"]);
}

[Theory]
[InlineData(MediaTypeNames.Application.Octet, new byte[0])]
[InlineData(MediaTypeNames.Application.Json, null)]
[InlineData(MediaTypeNames.Application.Xml, "")]
[InlineData(MediaTypeNames.Text.Plain, "")]
[InlineData(null, null)]
public void KafkaBinaryMessageTombstoneTest(string contentType, object? expectedDecodedResult)
{
var jsonEventFormatter = new JsonEventFormatter();
var cloudEvent = new CloudEvent(Partitioning.AllAttributes)
{
Type = "com.github.pull.create",
Source = new Uri("https://github.com/cloudevents/spec/pull/123"),
Id = "A234-1234-1234",
Time = new DateTimeOffset(2018, 4, 5, 17, 31, 0, TimeSpan.Zero),
DataContentType = contentType,
Data = null,
["comexampleextension1"] = "value",
[Partitioning.PartitionKeyAttribute] = "hello much wow"
};

var message = cloudEvent.ToKafkaMessage(ContentMode.Binary, new JsonEventFormatter());
Assert.True(message.IsCloudEvent());

// Sending an empty message is equivalent to a delete (tombstone) for that partition key, when using compacted topics in Kafka.
// This is the main use case for empty data messages with Kafka.
Assert.Empty(message.Value);

// Using serialization to create fully independent copy thus simulating message transport.
// The real transport will work in a similar way.
var serialized = JsonConvert.SerializeObject(message, new HeaderConverter());
var settings = new JsonSerializerSettings
{
Converters = { new HeadersConverter(), new HeaderConverter() }
};
var messageCopy = JsonConvert.DeserializeObject<Message<string?, byte[]>>(serialized, settings)!;

Assert.True(messageCopy.IsCloudEvent());
var receivedCloudEvent = messageCopy.ToCloudEvent(jsonEventFormatter, Partitioning.AllAttributes);

Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), receivedCloudEvent.Source);
Assert.Equal("A234-1234-1234", receivedCloudEvent.Id);
AssertTimestampsEqual("2018-04-05T17:31:00Z", receivedCloudEvent.Time!.Value);
Assert.Equal(contentType, receivedCloudEvent.DataContentType);
Assert.Equal(expectedDecodedResult, receivedCloudEvent.Data);
Assert.Equal("hello much wow", (string?)receivedCloudEvent[Partitioning.PartitionKeyAttribute]);
Assert.Equal("value", (string?)receivedCloudEvent["comexampleextension1"]);
}

[Fact]
public void ContentTypeCanBeInferredByFormatter()
{
Expand Down

0 comments on commit 81c82e0

Please sign in to comment.