Skip to content

Commit

Permalink
Improve statistics metrics (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
IharYakimush authored Jan 29, 2025
1 parent 18db906 commit bd80611
Show file tree
Hide file tree
Showing 23 changed files with 709 additions and 194 deletions.
14 changes: 13 additions & 1 deletion src/Epam.Kafka/Internals/ClientWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,21 @@ namespace Epam.Kafka.Internals;

internal abstract class ClientWrapper : IClient
{
private bool _disposed;
protected abstract IClient Inner { get; }

public abstract void Dispose();
public virtual void Dispose()
{
this._disposed = true;
}

protected void EnsureNotDisposed()
{
if (this._disposed)
{
throw new ObjectDisposedException(this.GetType().Name);
}
}

public int AddBrokers(string brokers)
{
Expand Down
61 changes: 25 additions & 36 deletions src/Epam.Kafka/Internals/Observable/ObservableClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

namespace Epam.Kafka.Internals.Observable;

#pragma warning disable CA1031 // notify other listeners event if one of them failed
#pragma warning disable CA1031 // notify other listeners even if one of them failed

internal abstract class ObservableClient : ClientWrapper, IObservable<Error>, IObservable<string>, IObservable<Statistics>
{
private readonly ParseStatsJsonObserver _parseObserver = new();

protected List<IObserver<Error>>? ErrorObservers { get; set; }
protected List<IObserver<string>>? StatObservers { get; set; }

Expand All @@ -23,7 +25,7 @@ protected void StatisticsHandler(string json)
}
catch
{
// notify other listeners event if one of them failed
// notify other listeners even if one of them failed
}
}
}
Expand All @@ -38,46 +40,43 @@ protected void ErrorHandler(Error error)
}
catch
{
// notify other listeners event if one of them failed
// notify other listeners even if one of them failed
}
}
}

protected void ClearObservers()
protected void CompleteObservers()
{
ClearObservers(this.ErrorObservers);
ClearObservers(this.StatObservers);
CompleteObservers(this.ErrorObservers);
CompleteObservers(this.StatObservers);
}

private static void ClearObservers<T>(List<IObserver<T>>? items)
private static void CompleteObservers<T>(List<IObserver<T>>? items)
{
if (items == null)
{
return;
}

foreach (IObserver<T> item in items.ToArray())
foreach (IObserver<T> item in items)
{
if (items.Contains(item))
try
{
item.OnCompleted();
}
catch
{
try
{
item.OnCompleted();
}
catch
{
// notify other listeners event if one of them failed
}
// notify other listeners even if one of them failed
}
}

items.Clear();
}

public IDisposable Subscribe(IObserver<Error> observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));

this.EnsureNotDisposed();

if (this.ErrorObservers == null)
{
throw new InvalidOperationException(
Expand All @@ -96,6 +95,8 @@ public IDisposable Subscribe(IObserver<string> observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));

this.EnsureNotDisposed();

if (this.StatObservers == null)
{
throw new InvalidOperationException(
Expand All @@ -114,24 +115,12 @@ public IDisposable Subscribe(IObserver<Statistics> observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));

return this.Subscribe(new ParseStatsJsonObserver(observer));
}

private class Unsubscriber<T> : IDisposable
{
private readonly List<IObserver<T>> _observers;
private readonly IObserver<T> _observer;

public Unsubscriber(List<IObserver<T>> observers, IObserver<T> observer)
{
this._observers = observers;
this._observer = observer;
}
this.EnsureNotDisposed();

public void Dispose()
{
this._observers.Remove(this._observer);
}
#pragma warning disable CA2000 // don't need to unsubscribe
this.Subscribe(this._parseObserver);
#pragma warning restore CA2000
return this._parseObserver.Subscribe(observer);
}
}

Expand Down
79 changes: 73 additions & 6 deletions src/Epam.Kafka/Internals/Observable/ObservableConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright © 2024 EPAM Systems

using Confluent.Kafka;

using Epam.Kafka.Metrics;

namespace Epam.Kafka.Internals.Observable;
Expand Down Expand Up @@ -39,170 +40,236 @@ public ObservableConsumer(ConsumerBuilder<TKey, TValue> builder)
this._inner = builder.Build();
}

protected override IClient Inner => this._inner;
protected override IClient Inner
{
get
{
this.EnsureNotDisposed();
return this._inner;
}
}

public override void Dispose()
{
base.Dispose();

try
{
this._inner.Dispose();
}
finally
{
this.ClearObservers();
this.CompleteObservers();
}
}

public ConsumeResult<TKey, TValue> Consume(int millisecondsTimeout)
{
this.EnsureNotDisposed();
return this._inner.Consume(millisecondsTimeout);
}

public ConsumeResult<TKey, TValue> Consume(CancellationToken cancellationToken = new CancellationToken())
{
this.EnsureNotDisposed();
return this._inner.Consume(cancellationToken);
}

public ConsumeResult<TKey, TValue> Consume(TimeSpan timeout)
{
this.EnsureNotDisposed();
return this._inner.Consume(timeout);
}

public void Subscribe(IEnumerable<string> topics)
{
this.EnsureNotDisposed();
this._inner.Subscribe(topics);
}

public void Subscribe(string topic)
{
this.EnsureNotDisposed();
this._inner.Subscribe(topic);
}

public void Unsubscribe()
{
this.EnsureNotDisposed();
this._inner.Unsubscribe();
}

public void Assign(TopicPartition partition)
{
this.EnsureNotDisposed();
this._inner.Assign(partition);
}

public void Assign(TopicPartitionOffset partition)
{
this.EnsureNotDisposed();
this._inner.Assign(partition);
}

public void Assign(IEnumerable<TopicPartitionOffset> partitions)
{
this.EnsureNotDisposed();
this._inner.Assign(partitions);
}

public void Assign(IEnumerable<TopicPartition> partitions)
{
this.EnsureNotDisposed();
this._inner.Assign(partitions);
}

public void IncrementalAssign(IEnumerable<TopicPartitionOffset> partitions)
{
this.EnsureNotDisposed();
this._inner.IncrementalAssign(partitions);
}

public void IncrementalAssign(IEnumerable<TopicPartition> partitions)
{
this.EnsureNotDisposed();
this._inner.IncrementalAssign(partitions);
}

public void IncrementalUnassign(IEnumerable<TopicPartition> partitions)
{
this.EnsureNotDisposed();
this._inner.IncrementalUnassign(partitions);
}

public void Unassign()
{
this.EnsureNotDisposed();
this._inner.Unassign();
}

public void StoreOffset(ConsumeResult<TKey, TValue> result)
{
this.EnsureNotDisposed();
this._inner.StoreOffset(result);
}

public void StoreOffset(TopicPartitionOffset offset)
{
this.EnsureNotDisposed();
this._inner.StoreOffset(offset);
}

public List<TopicPartitionOffset> Commit()
{
this.EnsureNotDisposed();
return this._inner.Commit();
}

public void Commit(IEnumerable<TopicPartitionOffset> offsets)
{
this.EnsureNotDisposed();
this._inner.Commit(offsets);
}

public void Commit(ConsumeResult<TKey, TValue> result)
{
this.EnsureNotDisposed();
this._inner.Commit(result);
}

public void Seek(TopicPartitionOffset tpo)
{
this.EnsureNotDisposed();
this._inner.Seek(tpo);
}

public void Pause(IEnumerable<TopicPartition> partitions)
{
this.EnsureNotDisposed();
this._inner.Pause(partitions);
}

public void Resume(IEnumerable<TopicPartition> partitions)
{
this.EnsureNotDisposed();
this._inner.Resume(partitions);
}

public List<TopicPartitionOffset> Committed(TimeSpan timeout)
{
this.EnsureNotDisposed();
return this._inner.Committed(timeout);
}

public List<TopicPartitionOffset> Committed(IEnumerable<TopicPartition> partitions, TimeSpan timeout)
{
this.EnsureNotDisposed();
return this._inner.Committed(partitions, timeout);
}

public Offset Position(TopicPartition partition)
{
this.EnsureNotDisposed();
return this._inner.Position(partition);
}

public List<TopicPartitionOffset> OffsetsForTimes(IEnumerable<TopicPartitionTimestamp> timestampsToSearch, TimeSpan timeout)
{
this.EnsureNotDisposed();
return this._inner.OffsetsForTimes(timestampsToSearch, timeout);
}

public WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition)
{
this.EnsureNotDisposed();
return this._inner.GetWatermarkOffsets(topicPartition);
}

public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
{
this.EnsureNotDisposed();
return this._inner.QueryWatermarkOffsets(topicPartition, timeout);
}

public void Close()
{
this.EnsureNotDisposed();
this._inner.Close();
}

public string MemberId => this._inner.MemberId;
public string MemberId
{
get
{
this.EnsureNotDisposed();
return this._inner.MemberId;
}
}

public List<TopicPartition> Assignment => this._inner.Assignment;
public List<TopicPartition> Assignment
{
get
{
this.EnsureNotDisposed();
return this._inner.Assignment;
}
}

public List<string> Subscription => this._inner.Subscription;
public List<string> Subscription
{
get
{
this.EnsureNotDisposed();
return this._inner.Subscription;
}
}

public IConsumerGroupMetadata ConsumerGroupMetadata => this._inner.ConsumerGroupMetadata;
public IConsumerGroupMetadata ConsumerGroupMetadata
{
get
{
this.EnsureNotDisposed();
return this._inner.ConsumerGroupMetadata;
}
}
}
Loading

0 comments on commit bd80611

Please sign in to comment.