Skip to content

Commit

Permalink
Sub pause rework (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
IharYakimush authored Jul 15, 2024
1 parent c95bd28 commit f453f76
Show file tree
Hide file tree
Showing 26 changed files with 1,055 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public IReadOnlyCollection<TopicPartitionOffset> GetOrCreate(IReadOnlyCollection

if (local != null)
{
result.Add(new TopicPartitionOffset(item, local.Pause ? Offset.End : local.Offset));
result.Add(new TopicPartitionOffset(item, local.Pause ? ExternalOffset.Paused : local.Offset));
}
else
{
Expand Down Expand Up @@ -109,9 +109,9 @@ public IReadOnlyCollection<TopicPartitionOffset> CommitOrReset(
KafkaTopicState local = locals.Single(x =>
x.Topic == item.Topic && x.Partition == item.Partition && x.ConsumerGroup == consumerGroup);

if (local.Pause && item.Offset == Offset.End)
if (local.Pause && item.Offset == ExternalOffset.Paused)
{
// don't update paused topics with standard Offset.End value
// don't update paused topics with standard value.
}
else
{
Expand Down Expand Up @@ -176,7 +176,7 @@ private static List<TopicPartitionOffset> GetLocalState(

if (local != null)
{
result.Add(new TopicPartitionOffset(item, local.Pause ? Offset.End : local.Offset));
result.Add(new TopicPartitionOffset(item, local.Pause ? ExternalOffset.Paused : local.Offset));
}
else
{
Expand Down
16 changes: 16 additions & 0 deletions src/Epam.Kafka.PubSub/Subscription/ExternalOffset.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright © 2024 EPAM Systems

using Confluent.Kafka;

namespace Epam.Kafka.PubSub.Subscription;

/// <summary>
/// Holds additional special offsets supported by <see cref="IExternalOffsetsStorage"/> only.
/// </summary>
public static class ExternalOffset
{
/// <summary>
/// Indicates that subscription should be paused
/// </summary>
public static Offset Paused { get; } = -863;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public enum SubscriptionBatchResult
NotAssigned = 4,

/// <summary>
/// All assigned topic partitions paused (pointing to <see cref="Offset.End"/> special value). Valid only for external and combined offsets storage.
/// All assigned topic partitions paused (pointing to <see cref="ExternalOffset.Paused"/> special value). Valid only for external and combined offsets storage.
/// </summary>
Paused = 5
}
7 changes: 4 additions & 3 deletions src/Epam.Kafka.PubSub/Subscription/State/BatchState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ namespace Epam.Kafka.PubSub.Subscription.State;

internal abstract class BatchState
{
public IReadOnlyCollection<ConsumeResult<TKey, TValue>> GetBatch<TKey, TValue>(
public bool GetBatch<TKey, TValue>(
SubscriptionTopicWrapper<TKey, TValue> topic,
ActivityWrapper activitySpan,
out IReadOnlyCollection<ConsumeResult<TKey, TValue>> batch,
CancellationToken cancellationToken)
{
if (topic == null)
Expand All @@ -26,12 +27,12 @@ public IReadOnlyCollection<ConsumeResult<TKey, TValue>> GetBatch<TKey, TValue>(

cancellationToken.ThrowIfCancellationRequested();

IReadOnlyCollection<ConsumeResult<TKey, TValue>> batch = topic.GetBatch(activitySpan, cancellationToken);
batch = topic.GetBatch(activitySpan, cancellationToken);

// try to throw handler assign exception after read to be able to do it after potential re-balance in same batch.
topic.ThrowIfNeeded();

return batch;
return topic.UnassignedBeforeRead;
}

protected abstract void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TKey, TValue> topic,
Expand Down
19 changes: 9 additions & 10 deletions src/Epam.Kafka.PubSub/Subscription/State/CombinedState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,28 @@ protected override void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TK
if (topic.Consumer.Assignment.Count > 0)
{
var reset = new List<TopicPartitionOffset>();
var pause = new List<TopicPartition>();

IReadOnlyCollection<TopicPartitionOffset> state = this._offsetsStorage.GetOrCreate(
topic.Consumer.Assignment, topic.ConsumerGroup,
cancellationToken);

foreach (TopicPartitionOffset item in state)
{
if (!topic.Offsets.TryGetValue(item.TopicPartition, out Offset previous))
if (topic.TryGetOffset(item.TopicPartition, out Offset previous))
{
reset.Add(item);
continue; // Don't need to seek if previous offset unavailable
}

// don't reset paused offset
if (previous != item.Offset)
{
reset.Add(item);
topic.Seek(item);
// don't reset paused offset
if (previous != item.Offset)
{
ExternalStateExtensions.PauseOrReset(topic, item, pause, reset);
}
}
}

topic.OnReset(reset);

topic.OnPause(pause);

topic.CommitOffsetIfNeeded(activitySpan, reset);
}
}
Expand Down
33 changes: 19 additions & 14 deletions src/Epam.Kafka.PubSub/Subscription/State/ExternalState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,42 +30,47 @@ protected override void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TK
IReadOnlyCollection<TopicPartitionOffset> state =
topic.GetAndResetState(this._offsetsStorage, topicPartitions, cancellationToken);

var pause = new List<TopicPartition>();
var reset = new List<TopicPartitionOffset>();
var assign = new List<TopicPartitionOffset>();
var assignNonPaused = new List<TopicPartitionOffset>();

foreach (TopicPartitionOffset item in state)
{
// existing assignment, check if offset reset
if (topic.Consumer.Assignment.Contains(item.TopicPartition))
{
if (topic.Offsets[item.TopicPartition] != item.Offset)
if (topic.TryGetOffset(item.TopicPartition, out var tp) && tp != item.Offset)
{
TopicPartitionOffset tpo = new(item.TopicPartition, item.Offset);

topic.Seek(tpo);

reset.Add(tpo);
ExternalStateExtensions.PauseOrReset(topic, item, pause, reset);
}
}
else
{
TopicPartitionOffset tpo = new(item.TopicPartition, item.Offset);
topic.Offsets[item.TopicPartition] = item.Offset;

// first assign offset.end, than pause consumer
if (tpo.Offset == ExternalOffset.Paused)
{
pause.Add(item.TopicPartition);
tpo = new(item.TopicPartition, Offset.End);
}
else
{
assignNonPaused.Add(tpo);
}

assign.Add(tpo);
}
}

if (assign.Count > 0)
{
topic.Consumer.Assign(assign);

topic.Logger.PartitionsAssigned(topic.Monitor.Name, null, assign);
}
topic.OnAssign(assign);

topic.OnReset(reset);

topic.CommitOffsetIfNeeded(activitySpan, reset);
topic.OnPause(pause);

topic.CommitOffsetIfNeeded(activitySpan, reset.Concat(assignNonPaused));
}

protected override IReadOnlyCollection<TopicPartitionOffset> CommitState<TKey, TValue>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public static IReadOnlyCollection<TopicPartitionOffset> CommitState<TKey, TValue
throw new ArgumentNullException(nameof(offsets));
}

var pause = new List<TopicPartition>();
var reset = new List<TopicPartitionOffset>();
var committed = new List<TopicPartitionOffset>();
IReadOnlyCollection<TopicPartitionOffset> newState;
Expand All @@ -47,32 +48,48 @@ public static IReadOnlyCollection<TopicPartitionOffset> CommitState<TKey, TValue
}
else
{
TopicPartitionOffset tpo = new(item.TopicPartition, item.Offset);

topic.Seek(tpo);

reset.Add(tpo);
PauseOrReset(topic, item, pause, reset);
}
}

topic.OnReset(reset);

topic.OnPause(pause);

topic.CommitOffsetIfNeeded(activitySpan, newState);

return committed;
}

public static void PauseOrReset<TKey, TValue>(
SubscriptionTopicWrapper<TKey, TValue> topic,
TopicPartitionOffset item,
List<TopicPartition> pause,
List<TopicPartitionOffset> reset)
{
if (item.Offset == ExternalOffset.Paused)
{
pause.Add(item.TopicPartition);
}
else
{
TopicPartitionOffset tpo = new(item.TopicPartition, item.Offset);
topic.Seek(tpo);
reset.Add(tpo);
}
}

public static void CommitOffsetIfNeeded<TKey, TValue>(
this SubscriptionTopicWrapper<TKey, TValue> topic,
ActivityWrapper activitySpan,
IReadOnlyCollection<TopicPartitionOffset> offsets)
IEnumerable<TopicPartitionOffset> offsets)
{
if (topic.Options.ExternalStateCommitToKafka)
{
List<TopicPartitionOffset> toCommit = new();

try
{
List<TopicPartitionOffset> toCommit = new();

foreach (TopicPartitionOffset item in offsets)
{
if (item.Offset.Value >= 0)
Expand All @@ -93,13 +110,18 @@ public static void CommitOffsetIfNeeded<TKey, TValue>(
toCommit.Add(new TopicPartitionOffset(item.TopicPartition, w.Low));
}
}
else if (item.Offset == Offset.End)
{
var w = topic.Consumer.QueryWatermarkOffsets(item.TopicPartition, TimeSpan.FromSeconds(5));
toCommit.Add(new TopicPartitionOffset(item.TopicPartition, w.High));
}
}

topic.CommitOffsets(activitySpan, toCommit);
}
catch (KafkaException exception)
{
topic.Logger.KafkaCommitFailed(exception, topic.Monitor.Name, offsets);
topic.Logger.KafkaCommitFailed(exception, topic.Monitor.Name, toCommit);

// ignore exception because external provider is a single point of truth for offsets.
// failed commit will trigger offset reset on next batch iteration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,10 @@ protected override void ExecuteBatch(
{
BatchState state = ResolveRequiredService<BatchState>(sp, this.Options.StateType);

bool unassignedBeforeRead = topic.Consumer.Assignment.Count == 0;

this.Monitor.Batch.Update(BatchStatus.Reading);

IReadOnlyCollection<ConsumeResult<TKey, TValue>> batch = state.GetBatch(
topic, activitySpan, cancellationToken);
bool unassignedBeforeRead = state.GetBatch(
topic, activitySpan, out var batch, cancellationToken);

cancellationToken.ThrowIfCancellationRequested();

Expand Down Expand Up @@ -141,7 +139,7 @@ protected override void ExecuteBatch(
List<TopicPartition> assignments = topic.Consumer.Assignment;

if (assignments.Count > 0 &&
assignments.All(x => topic.Offsets.TryGetValue(x, out Offset offset) && offset == Offset.End))
assignments.All(x => topic.TryGetOffset(x, out Offset offset) && offset == ExternalOffset.Paused))
{
this.Monitor.Result.Update(SubscriptionBatchResult.Paused);

Expand Down
Loading

0 comments on commit f453f76

Please sign in to comment.