Skip to content

Commit

Permalink
Commit offsets to kafka on reset if external state storage used (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
IharYakimush authored Jul 10, 2024
1 parent 9cb6589 commit f685a26
Show file tree
Hide file tree
Showing 17 changed files with 73 additions and 84 deletions.
3 changes: 1 addition & 2 deletions sample/Epam.Kafka.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ private static void Main(string[] args)
// view metrics in console for demo purposes only
services.AddOpenTelemetry()
.WithMetrics(mb => mb
.AddMeter(PipelineMonitor.StatusMeterName, PipelineMonitor.HealthMeterName, Statistics.MeterName,
Statistics.TopicsMeterName)
.AddMeter(PipelineMonitor.StatusMeterName, PipelineMonitor.HealthMeterName, Statistics.MeterName)
.AddConsoleExporter());

KafkaBuilder kafkaBuilder = services.AddKafka()
Expand Down
5 changes: 3 additions & 2 deletions src/Epam.Kafka.PubSub/Subscription/State/BatchState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ public IReadOnlyCollection<ConsumeResult<TKey, TValue>> GetBatch<TKey, TValue>(

topic.ClearIfNotAssigned();

using (activitySpan.CreateSpan("assign"))
using (var span = activitySpan.CreateSpan("assign"))
{
this.AssignConsumer(topic, cancellationToken);
this.AssignConsumer(topic, span, cancellationToken);
}

cancellationToken.ThrowIfCancellationRequested();
Expand All @@ -35,6 +35,7 @@ public IReadOnlyCollection<ConsumeResult<TKey, TValue>> GetBatch<TKey, TValue>(
}

protected abstract void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TKey, TValue> topic,
ActivityWrapper activitySpan,
CancellationToken cancellationToken);

public void CommitResults<TKey, TValue>(SubscriptionTopicWrapper<TKey, TValue> topic,
Expand Down
5 changes: 4 additions & 1 deletion src/Epam.Kafka.PubSub/Subscription/State/CombinedState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ public CombinedState(TOffsetsStorage offsetsStorage)
}

protected override void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TKey, TValue> topic,
ActivityWrapper activitySpan,
CancellationToken cancellationToken)
{
topic.ExternalState = list => topic.GetAndResetState(this._offsetsStorage, list, cancellationToken);

base.AssignConsumer(topic, cancellationToken);
base.AssignConsumer(topic, activitySpan, cancellationToken);

if (topic.Consumer.Assignment.Count > 0)
{
Expand All @@ -49,6 +50,8 @@ protected override void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TK
}

topic.OnReset(reset);

topic.CommitOffsetIfNeeded(activitySpan, reset);
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/Epam.Kafka.PubSub/Subscription/State/ExternalState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public ExternalState(TOffsetsStorage offsetsStorage)
}

protected override void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TKey, TValue> topic,
ActivityWrapper activitySpan,
CancellationToken cancellationToken)
{
if (topic == null)
Expand Down Expand Up @@ -63,6 +64,8 @@ protected override void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TK
}

topic.OnReset(reset);

topic.CommitOffsetIfNeeded(activitySpan, reset);
}

protected override IReadOnlyCollection<TopicPartitionOffset> CommitState<TKey, TValue>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ public static IReadOnlyCollection<TopicPartitionOffset> CommitState<TKey, TValue
CancellationToken cancellationToken)
{
if (topic == null)
{
throw new ArgumentNullException(nameof(topic));
}

if (offsets == null)
{
throw new ArgumentNullException(nameof(offsets));
}

var reset = new List<TopicPartitionOffset>();
var committed = new List<TopicPartitionOffset>();
Expand Down Expand Up @@ -53,21 +57,53 @@ public static IReadOnlyCollection<TopicPartitionOffset> CommitState<TKey, TValue

topic.OnReset(reset);

topic.CommitOffsetIfNeeded(activitySpan, newState);

return committed;
}

public static void CommitOffsetIfNeeded<TKey, TValue>(
this SubscriptionTopicWrapper<TKey, TValue> topic,
ActivityWrapper activitySpan,
IReadOnlyCollection<TopicPartitionOffset> offsets)
{
if (topic.Options.ExternalStateCommitToKafka)
{
try
{
topic.CommitOffsets(activitySpan, committed);
List<TopicPartitionOffset> toCommit = new();

foreach (TopicPartitionOffset item in offsets)
{
if (item.Offset.Value >= 0)
{
toCommit.Add(item);
}
else if (item.Offset == Offset.Beginning)
{
var w = topic.Consumer.GetWatermarkOffsets(item.TopicPartition);

if (w.Low == Offset.Unset)
{
w = topic.Consumer.QueryWatermarkOffsets(item.TopicPartition, TimeSpan.FromSeconds(5));
}

if (w.Low.Value >= 0)
{
toCommit.Add(new TopicPartitionOffset(item.TopicPartition, w.Low));
}
}
}

topic.CommitOffsets(activitySpan, toCommit);
}
catch (KafkaException exception)
{
topic.Logger.KafkaCommitFailed(exception, topic.Monitor.Name, committed);
topic.Logger.KafkaCommitFailed(exception, topic.Monitor.Name, offsets);

// ignore exception because external provider is a single point of truth for offsets.
// failed commit will trigger offset reset on next batch iteration
}
}

return committed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Epam.Kafka.PubSub.Subscription.State;
internal class InternalKafkaState : BatchState
{
protected override void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TKey, TValue> topic,
ActivityWrapper activitySpan,
CancellationToken cancellationToken)
{
if (topic == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,13 @@ private void ConfigureConsumerConfig(ConsumerConfig config)

public void CommitOffsets(ActivityWrapper activitySpan, IReadOnlyCollection<TopicPartitionOffset> offsets)
{
using (activitySpan.CreateSpan("commit_kafka"))
if (offsets.Count > 0)
{
using var span = activitySpan.CreateSpan("commit_kafka");
// commit to kafka also
this.Consumer.Commit(offsets);

span.SetResult(offsets);
}
}

Expand Down
5 changes: 1 addition & 4 deletions src/Epam.Kafka/Epam.Kafka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Simplify Confluent.Kafka usage by leveraging best patterns and practices:
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.4.0" />
<PackageReference Include="Confluent.SchemaRegistry" Version="2.4.0" />
<PackageReference Include="System.Text.Json" Version="8.0.4" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net6.0' or '$(TargetFramework)' == 'net462' or '$(TargetFramework)' == 'netstandard2.0'">
Expand All @@ -21,10 +22,6 @@ Simplify Confluent.Kafka usage by leveraging best patterns and practices:
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="6.0.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net462' or '$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="System.Text.Json" Version="6.0.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net8.0'">
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
Expand Down
21 changes: 0 additions & 21 deletions src/Epam.Kafka/Internals/Metrics/ConsumerTopicsMetrics.cs

This file was deleted.

16 changes: 0 additions & 16 deletions src/Epam.Kafka/Internals/Metrics/StatisticsMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,4 @@ public void OnNext(Statistics value)
}

protected abstract void Create();

protected IEnumerable<KeyValuePair<string, object?>> BuildTpTags(string topic, long partition)
{
return this.Tags.Concat(
new Dictionary<string, object?>
{
{ "topic", topic },
{ "partition", partition }
});
}

protected Measurement<long> CreateStatusMetric(string value)
{
return new Measurement<long>(this.Latest.EpochTimeSeconds,
this.Tags.Concat(Enumerable.Repeat(new KeyValuePair<string, object?>("state", value), 1)));
}
}
4 changes: 0 additions & 4 deletions src/Epam.Kafka/Internals/Observable/ObservableConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ internal class ObservableConsumer<TKey, TValue> : ObservableClient, IConsumer<TK
{
private readonly IConsumer<TKey, TValue> _inner;
private readonly Meter? _meter;
private readonly Meter? _topicsMeter;

public ObservableConsumer(ConsumerBuilder<TKey, TValue> builder, bool metrics)
{
Expand All @@ -36,9 +35,7 @@ public ObservableConsumer(ConsumerBuilder<TKey, TValue> builder, bool metrics)
if (metrics)
{
this._meter = new Meter(Statistics.MeterName);
this._topicsMeter = new Meter(Statistics.TopicsMeterName);
this.StatObservers.Add(new ConsumerMetrics(this._meter));
this.StatObservers.Add(new ConsumerTopicsMetrics(this._topicsMeter));
}
}
catch (InvalidOperationException)
Expand All @@ -62,7 +59,6 @@ public override void Dispose()
this.ClearObservers();

this._meter?.Dispose();
this._topicsMeter?.Dispose();
}
}

Expand Down
5 changes: 0 additions & 5 deletions src/Epam.Kafka/Stats/Statistics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ public class Statistics
/// </summary>
public const string MeterName = "Epam.Kafka.Statistics";

/// <summary>
/// Name of <see cref="Meter"/> used to expose topics statistics if <see cref="KafkaConfigExtensions.DotnetStatisticMetricsKey"/> enable it.
/// </summary>
public const string TopicsMeterName = "Epam.Kafka.Statistics.Topics";

/// <summary>
/// Create new instance of <see cref="Statistics"/> object from json representation.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public async Task OneRunningOneStarting()

// iteration 3
observer.AssertStart();
observer.AssertAssign();
observer.AssertAssign(true);
observer.AssertRead(5);
observer.AssertProcess();
observer.AssertCommitExternal();
Expand Down Expand Up @@ -139,7 +139,7 @@ public async Task AllPausedStartOne()

// iteration 3
observer.AssertStart();
observer.AssertAssign();
observer.AssertAssign(true);
observer.AssertRead(5);
observer.AssertProcess();
observer.AssertCommitExternal();
Expand Down Expand Up @@ -207,7 +207,7 @@ public async Task AllPausedStopOne(bool onCommit)
observer.AssertStop(SubscriptionBatchResult.Processed);

// iteration 3
observer.AssertSubEmpty();
observer.AssertSubEmpty(!onCommit);
}

[Theory]
Expand Down Expand Up @@ -275,7 +275,7 @@ public async Task ResetToBeginning(bool onCommit)

// iteration 3
observer.AssertStart();
observer.AssertAssign();
observer.AssertAssign(!onCommit);
observer.AssertRead(5);
observer.AssertProcess();
observer.AssertCommitExternal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public async Task OneRunningOneStarting()

// iteration 2
observer.AssertStart();
observer.AssertAssign();
observer.AssertAssign(true);
observer.AssertRead(5);
observer.AssertProcess();
observer.AssertCommitExternal();
Expand Down Expand Up @@ -134,7 +134,7 @@ public async Task AllPausedStartOne()

// iteration 2
observer.AssertStart();
observer.AssertAssign();
observer.AssertAssign(true);
observer.AssertRead(5);
observer.AssertProcess();
observer.AssertCommitExternal();
Expand Down Expand Up @@ -203,6 +203,6 @@ public async Task AllPausedStopOne(bool onCommit)
observer.AssertStop(SubscriptionBatchResult.Processed);

// iteration 2
observer.AssertSubEmpty();
observer.AssertSubEmpty(!onCommit);
}
}
12 changes: 9 additions & 3 deletions tests/Epam.Kafka.PubSub.Tests/Helpers/AssertExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ public static void AssertSubNotAssigned(this TestObserver observer)
observer.AssertStop(SubscriptionBatchResult.NotAssigned);
}

public static void AssertAssign(this TestObserver observer)
public static void AssertAssign(this TestObserver observer, bool offsetsCommit = false)
{
observer.AssertNextActivity("assign.Start");

if (offsetsCommit)
{
observer.AssertCommitKafka();
}

observer.AssertNextActivity("assign.Stop");
}

Expand Down Expand Up @@ -54,10 +60,10 @@ public static void AssertProcess(this TestObserver observer)
observer.AssertNextActivity("process.Stop");
}

public static void AssertSubEmpty(this TestObserver observer)
public static void AssertSubEmpty(this TestObserver observer, bool offsetsCommit = false)
{
observer.AssertStart();
observer.AssertAssign();
observer.AssertAssign(offsetsCommit);
observer.AssertRead(0);
observer.AssertStop(SubscriptionBatchResult.Empty);
}
Expand Down
1 change: 0 additions & 1 deletion tests/Epam.Kafka.Tests/Epam.Kafka.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ namespace Epam.Kafka.Stats
public class Statistics
{
public const string MeterName = "Epam.Kafka.Statistics";
public const string TopicsMeterName = "Epam.Kafka.Statistics.Topics";
public Statistics() { }
[System.Text.Json.Serialization.JsonPropertyName("age")]
public long AgeMicroseconds { get; set; }
Expand Down
Loading

0 comments on commit f685a26

Please sign in to comment.