From bd806116d9f5959366314b52f8e0018b7c6b20e6 Mon Sep 17 00:00:00 2001 From: IharYakimush Date: Wed, 29 Jan 2025 14:48:49 +0300 Subject: [PATCH] Improve statistics metrics (#63) --- src/Epam.Kafka/Internals/ClientWrapper.cs | 14 ++- .../Internals/Observable/ObservableClient.cs | 61 +++++------ .../Observable/ObservableConsumer.cs | 79 ++++++++++++-- .../Observable/ObservableProducer.cs | 27 ++++- .../Internals/Observable/Unsubscriber.cs | 20 ++++ src/Epam.Kafka/Internals/SharedClient.cs | 16 ++- .../{TopLevelMetrics.cs => CommonMetrics.cs} | 8 +- src/Epam.Kafka/Metrics/ConsumerMetrics.cs | 11 +- src/Epam.Kafka/Metrics/ProducerMetrics.cs | 8 +- src/Epam.Kafka/Metrics/StatisticsMetrics.cs | 90 +++++++++++----- src/Epam.Kafka/Statistics.cs | 17 ++- src/Epam.Kafka/Stats/BrokerStatistics.cs | 3 +- src/Epam.Kafka/Stats/JsonContext.cs | 5 +- .../Stats/ParseStatsJsonObserver.cs | 67 +++++++++--- src/Epam.Kafka/Stats/PartitionStatistics.cs | 24 +++++ src/Epam.Kafka/Stats/TopicStatistics.cs | 12 +++ src/Epam.Kafka/Stats/TransactionStatistics.cs | 41 +++++++ src/Epam.Kafka/Stats/WindowStats.cs | 41 +++++++ tests/Epam.Kafka.Tests/Common/MeterHelper.cs | 66 ++++++++++++ tests/Epam.Kafka.Tests/Data/ConsumerStat.json | 48 ++++----- .../Epam.Kafka.Tests/Epam.Kafka.approved.txt | 47 +++++++- tests/Epam.Kafka.Tests/KafkaFactoryTests.cs | 101 +++++++++++++----- tests/Epam.Kafka.Tests/StatisticsTests.cs | 97 +++++++++++------ 23 files changed, 709 insertions(+), 194 deletions(-) create mode 100644 src/Epam.Kafka/Internals/Observable/Unsubscriber.cs rename src/Epam.Kafka/Metrics/{TopLevelMetrics.cs => CommonMetrics.cs} (62%) create mode 100644 src/Epam.Kafka/Stats/TransactionStatistics.cs create mode 100644 src/Epam.Kafka/Stats/WindowStats.cs create mode 100644 tests/Epam.Kafka.Tests/Common/MeterHelper.cs diff --git a/src/Epam.Kafka/Internals/ClientWrapper.cs b/src/Epam.Kafka/Internals/ClientWrapper.cs index 576e789..6f44dba 100644 --- a/src/Epam.Kafka/Internals/ClientWrapper.cs +++ b/src/Epam.Kafka/Internals/ClientWrapper.cs @@ -6,9 +6,21 @@ namespace Epam.Kafka.Internals; internal abstract class ClientWrapper : IClient { + private bool _disposed; protected abstract IClient Inner { get; } - public abstract void Dispose(); + public virtual void Dispose() + { + this._disposed = true; + } + + protected void EnsureNotDisposed() + { + if (this._disposed) + { + throw new ObjectDisposedException(this.GetType().Name); + } + } public int AddBrokers(string brokers) { diff --git a/src/Epam.Kafka/Internals/Observable/ObservableClient.cs b/src/Epam.Kafka/Internals/Observable/ObservableClient.cs index ea60ef3..4d7d6bc 100644 --- a/src/Epam.Kafka/Internals/Observable/ObservableClient.cs +++ b/src/Epam.Kafka/Internals/Observable/ObservableClient.cs @@ -6,10 +6,12 @@ namespace Epam.Kafka.Internals.Observable; -#pragma warning disable CA1031 // notify other listeners event if one of them failed +#pragma warning disable CA1031 // notify other listeners even if one of them failed internal abstract class ObservableClient : ClientWrapper, IObservable, IObservable, IObservable { + private readonly ParseStatsJsonObserver _parseObserver = new(); + protected List>? ErrorObservers { get; set; } protected List>? StatObservers { get; set; } @@ -23,7 +25,7 @@ protected void StatisticsHandler(string json) } catch { - // notify other listeners event if one of them failed + // notify other listeners even if one of them failed } } } @@ -38,46 +40,43 @@ protected void ErrorHandler(Error error) } catch { - // notify other listeners event if one of them failed + // notify other listeners even if one of them failed } } } - protected void ClearObservers() + protected void CompleteObservers() { - ClearObservers(this.ErrorObservers); - ClearObservers(this.StatObservers); + CompleteObservers(this.ErrorObservers); + CompleteObservers(this.StatObservers); } - private static void ClearObservers(List>? items) + private static void CompleteObservers(List>? items) { if (items == null) { return; } - foreach (IObserver item in items.ToArray()) + foreach (IObserver item in items) { - if (items.Contains(item)) + try + { + item.OnCompleted(); + } + catch { - try - { - item.OnCompleted(); - } - catch - { - // notify other listeners event if one of them failed - } + // notify other listeners even if one of them failed } } - - items.Clear(); } public IDisposable Subscribe(IObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); + this.EnsureNotDisposed(); + if (this.ErrorObservers == null) { throw new InvalidOperationException( @@ -96,6 +95,8 @@ public IDisposable Subscribe(IObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); + this.EnsureNotDisposed(); + if (this.StatObservers == null) { throw new InvalidOperationException( @@ -114,24 +115,12 @@ public IDisposable Subscribe(IObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); - return this.Subscribe(new ParseStatsJsonObserver(observer)); - } - - private class Unsubscriber : IDisposable - { - private readonly List> _observers; - private readonly IObserver _observer; - - public Unsubscriber(List> observers, IObserver observer) - { - this._observers = observers; - this._observer = observer; - } + this.EnsureNotDisposed(); - public void Dispose() - { - this._observers.Remove(this._observer); - } +#pragma warning disable CA2000 // don't need to unsubscribe + this.Subscribe(this._parseObserver); +#pragma warning restore CA2000 + return this._parseObserver.Subscribe(observer); } } diff --git a/src/Epam.Kafka/Internals/Observable/ObservableConsumer.cs b/src/Epam.Kafka/Internals/Observable/ObservableConsumer.cs index 302df42..5bb5de7 100644 --- a/src/Epam.Kafka/Internals/Observable/ObservableConsumer.cs +++ b/src/Epam.Kafka/Internals/Observable/ObservableConsumer.cs @@ -1,6 +1,7 @@ // Copyright © 2024 EPAM Systems using Confluent.Kafka; + using Epam.Kafka.Metrics; namespace Epam.Kafka.Internals.Observable; @@ -39,170 +40,236 @@ public ObservableConsumer(ConsumerBuilder builder) this._inner = builder.Build(); } - protected override IClient Inner => this._inner; + protected override IClient Inner + { + get + { + this.EnsureNotDisposed(); + return this._inner; + } + } public override void Dispose() { + base.Dispose(); + try { this._inner.Dispose(); } finally { - this.ClearObservers(); + this.CompleteObservers(); } } public ConsumeResult Consume(int millisecondsTimeout) { + this.EnsureNotDisposed(); return this._inner.Consume(millisecondsTimeout); } public ConsumeResult Consume(CancellationToken cancellationToken = new CancellationToken()) { + this.EnsureNotDisposed(); return this._inner.Consume(cancellationToken); } public ConsumeResult Consume(TimeSpan timeout) { + this.EnsureNotDisposed(); return this._inner.Consume(timeout); } public void Subscribe(IEnumerable topics) { + this.EnsureNotDisposed(); this._inner.Subscribe(topics); } public void Subscribe(string topic) { + this.EnsureNotDisposed(); this._inner.Subscribe(topic); } public void Unsubscribe() { + this.EnsureNotDisposed(); this._inner.Unsubscribe(); } public void Assign(TopicPartition partition) { + this.EnsureNotDisposed(); this._inner.Assign(partition); } public void Assign(TopicPartitionOffset partition) { + this.EnsureNotDisposed(); this._inner.Assign(partition); } public void Assign(IEnumerable partitions) { + this.EnsureNotDisposed(); this._inner.Assign(partitions); } public void Assign(IEnumerable partitions) { + this.EnsureNotDisposed(); this._inner.Assign(partitions); } public void IncrementalAssign(IEnumerable partitions) { + this.EnsureNotDisposed(); this._inner.IncrementalAssign(partitions); } public void IncrementalAssign(IEnumerable partitions) { + this.EnsureNotDisposed(); this._inner.IncrementalAssign(partitions); } public void IncrementalUnassign(IEnumerable partitions) { + this.EnsureNotDisposed(); this._inner.IncrementalUnassign(partitions); } public void Unassign() { + this.EnsureNotDisposed(); this._inner.Unassign(); } public void StoreOffset(ConsumeResult result) { + this.EnsureNotDisposed(); this._inner.StoreOffset(result); } public void StoreOffset(TopicPartitionOffset offset) { + this.EnsureNotDisposed(); this._inner.StoreOffset(offset); } public List Commit() { + this.EnsureNotDisposed(); return this._inner.Commit(); } public void Commit(IEnumerable offsets) { + this.EnsureNotDisposed(); this._inner.Commit(offsets); } public void Commit(ConsumeResult result) { + this.EnsureNotDisposed(); this._inner.Commit(result); } public void Seek(TopicPartitionOffset tpo) { + this.EnsureNotDisposed(); this._inner.Seek(tpo); } public void Pause(IEnumerable partitions) { + this.EnsureNotDisposed(); this._inner.Pause(partitions); } public void Resume(IEnumerable partitions) { + this.EnsureNotDisposed(); this._inner.Resume(partitions); } public List Committed(TimeSpan timeout) { + this.EnsureNotDisposed(); return this._inner.Committed(timeout); } public List Committed(IEnumerable partitions, TimeSpan timeout) { + this.EnsureNotDisposed(); return this._inner.Committed(partitions, timeout); } public Offset Position(TopicPartition partition) { + this.EnsureNotDisposed(); return this._inner.Position(partition); } public List OffsetsForTimes(IEnumerable timestampsToSearch, TimeSpan timeout) { + this.EnsureNotDisposed(); return this._inner.OffsetsForTimes(timestampsToSearch, timeout); } public WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition) { + this.EnsureNotDisposed(); return this._inner.GetWatermarkOffsets(topicPartition); } public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout) { + this.EnsureNotDisposed(); return this._inner.QueryWatermarkOffsets(topicPartition, timeout); } public void Close() { + this.EnsureNotDisposed(); this._inner.Close(); } - public string MemberId => this._inner.MemberId; + public string MemberId + { + get + { + this.EnsureNotDisposed(); + return this._inner.MemberId; + } + } - public List Assignment => this._inner.Assignment; + public List Assignment + { + get + { + this.EnsureNotDisposed(); + return this._inner.Assignment; + } + } - public List Subscription => this._inner.Subscription; + public List Subscription + { + get + { + this.EnsureNotDisposed(); + return this._inner.Subscription; + } + } - public IConsumerGroupMetadata ConsumerGroupMetadata => this._inner.ConsumerGroupMetadata; + public IConsumerGroupMetadata ConsumerGroupMetadata + { + get + { + this.EnsureNotDisposed(); + return this._inner.ConsumerGroupMetadata; + } + } } \ No newline at end of file diff --git a/src/Epam.Kafka/Internals/Observable/ObservableProducer.cs b/src/Epam.Kafka/Internals/Observable/ObservableProducer.cs index 0aa9238..6f1f2cd 100644 --- a/src/Epam.Kafka/Internals/Observable/ObservableProducer.cs +++ b/src/Epam.Kafka/Internals/Observable/ObservableProducer.cs @@ -41,88 +41,111 @@ public ObservableProducer(ProducerBuilder builder) this._inner = builder.Build(); } - protected override IClient Inner => this._inner; + protected override IClient Inner + { + get + { + this.EnsureNotDisposed(); + return this._inner; + } + } public override void Dispose() { + base.Dispose(); + try { this._inner.Dispose(); } finally { - this.ClearObservers(); + this.CompleteObservers(); } } public Task> ProduceAsync(string topic, Message message, CancellationToken cancellationToken = new CancellationToken()) { + this.EnsureNotDisposed(); return this._inner.ProduceAsync(topic, message, cancellationToken); } public Task> ProduceAsync(TopicPartition topicPartition, Message message, CancellationToken cancellationToken = new CancellationToken()) { + this.EnsureNotDisposed(); return this._inner.ProduceAsync(topicPartition, message, cancellationToken); } public void Produce(string topic, Message message, Action>? deliveryHandler = null) { + this.EnsureNotDisposed(); this._inner.Produce(topic, message, deliveryHandler); } public void Produce(TopicPartition topicPartition, Message message, Action>? deliveryHandler = null) { + this.EnsureNotDisposed(); this._inner.Produce(topicPartition, message, deliveryHandler); } public int Poll(TimeSpan timeout) { + this.EnsureNotDisposed(); return this._inner.Poll(timeout); } public int Flush(TimeSpan timeout) { + this.EnsureNotDisposed(); return this._inner.Flush(timeout); } public void Flush(CancellationToken cancellationToken = new CancellationToken()) { + this.EnsureNotDisposed(); this._inner.Flush(cancellationToken); } public void InitTransactions(TimeSpan timeout) { + this.EnsureNotDisposed(); this._inner.InitTransactions(timeout); } public void BeginTransaction() { + this.EnsureNotDisposed(); this._inner.BeginTransaction(); } public void CommitTransaction(TimeSpan timeout) { + this.EnsureNotDisposed(); this._inner.CommitTransaction(timeout); } public void CommitTransaction() { + this.EnsureNotDisposed(); this._inner.CommitTransaction(); } public void AbortTransaction(TimeSpan timeout) { + this.EnsureNotDisposed(); this._inner.AbortTransaction(timeout); } public void AbortTransaction() { + this.EnsureNotDisposed(); this._inner.AbortTransaction(); } public void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout) { + this.EnsureNotDisposed(); this._inner.SendOffsetsToTransaction(offsets, groupMetadata, timeout); } } \ No newline at end of file diff --git a/src/Epam.Kafka/Internals/Observable/Unsubscriber.cs b/src/Epam.Kafka/Internals/Observable/Unsubscriber.cs new file mode 100644 index 0000000..2535f73 --- /dev/null +++ b/src/Epam.Kafka/Internals/Observable/Unsubscriber.cs @@ -0,0 +1,20 @@ +// Copyright © 2024 EPAM Systems + +namespace Epam.Kafka.Internals.Observable; + +internal sealed class Unsubscriber : IDisposable +{ + private readonly List> _observers; + private readonly IObserver _observer; + + public Unsubscriber(List> observers, IObserver observer) + { + this._observers = observers; + this._observer = observer; + } + + public void Dispose() + { + this._observers.Remove(this._observer); + } +} \ No newline at end of file diff --git a/src/Epam.Kafka/Internals/SharedClient.cs b/src/Epam.Kafka/Internals/SharedClient.cs index 812e72c..22798f4 100644 --- a/src/Epam.Kafka/Internals/SharedClient.cs +++ b/src/Epam.Kafka/Internals/SharedClient.cs @@ -25,33 +25,45 @@ public SharedClient(IKafkaFactory kafkaFactory, string? cluster) this._client = kafkaFactory.CreateProducer(config, cluster); } - +#pragma warning disable CA2215 public override void Dispose() { // Have to implement IDisposable because it required by interface IClient defined in external library. // As a workaround we don't allow to dispose because this is shared client and it's lifetime should be equal to lifetime of factory. // Instead of this method factory will invoke DisposeInternal() on own dispose. } +#pragma warning restore CA2215 public void DisposeInternal() { + base.Dispose(); this._client.Dispose(); } - protected override IClient Inner => this._client; + protected override IClient Inner + { + get + { + this.EnsureNotDisposed(); + return this._client; + } + } public IDisposable Subscribe(IObserver observer) { + this.EnsureNotDisposed(); return ((IObservable)this._client).Subscribe(observer); } public IDisposable Subscribe(IObserver observer) { + this.EnsureNotDisposed(); return ((IObservable)this._client).Subscribe(observer); } public IDisposable Subscribe(IObserver observer) { + this.EnsureNotDisposed(); return ((IObservable)this._client).Subscribe(observer); } } \ No newline at end of file diff --git a/src/Epam.Kafka/Metrics/TopLevelMetrics.cs b/src/Epam.Kafka/Metrics/CommonMetrics.cs similarity index 62% rename from src/Epam.Kafka/Metrics/TopLevelMetrics.cs rename to src/Epam.Kafka/Metrics/CommonMetrics.cs index bf6524a..7b94fa0 100644 --- a/src/Epam.Kafka/Metrics/TopLevelMetrics.cs +++ b/src/Epam.Kafka/Metrics/CommonMetrics.cs @@ -4,15 +4,15 @@ namespace Epam.Kafka.Metrics; -internal class TopLevelMetrics : StatisticsMetrics +internal class CommonMetrics : StatisticsMetrics { - protected TopLevelMetrics() : base(Statistics.MeterName) + protected CommonMetrics() : base() { } - protected override void Initialize(Meter meter) + protected override void Initialize(Meter meter, Meter topParMeter) { - this.CreateTopLevelCounter(meter, "epam_kafka_stats_age", v => v.AgeMicroseconds, "microseconds", + this.CreateCounter(meter, "epam_kafka_stats_age", v => v.AgeMicroseconds, "microseconds", "Time since this client instance was created (microseconds)."); //this.CreateTopLevelCounter(meter, "epam_kafka_stats_replyq", v => v.OpsQueueCountGauge, description: diff --git a/src/Epam.Kafka/Metrics/ConsumerMetrics.cs b/src/Epam.Kafka/Metrics/ConsumerMetrics.cs index e88471b..602da8f 100644 --- a/src/Epam.Kafka/Metrics/ConsumerMetrics.cs +++ b/src/Epam.Kafka/Metrics/ConsumerMetrics.cs @@ -4,16 +4,19 @@ namespace Epam.Kafka.Metrics; -internal sealed class ConsumerMetrics : TopLevelMetrics +internal sealed class ConsumerMetrics : CommonMetrics { - protected override void Initialize(Meter meter) + protected override void Initialize(Meter meter, Meter topParMeter) { - base.Initialize(meter); + base.Initialize(meter, topParMeter); - this.CreateTopLevelCounter(meter, "epam_kafka_stats_rxmsgs", v => v.ConsumedMessagesTotal, + this.CreateCounter(meter, "epam_kafka_stats_rxmsgs", v => v.ConsumedMessagesTotal, description: "Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers."); //this.CreateTopLevelCounter(meter, "epam_kafka_stats_rx", v => v.ConsumedRequestsTotal, // description: "Total number of responses received from Kafka brokers."); + + this.CreateTpGauge(topParMeter, "epam_kafka_stats_tp_lag", + m => m.Value.ConsumerLag, null, "Consumer lag"); } } \ No newline at end of file diff --git a/src/Epam.Kafka/Metrics/ProducerMetrics.cs b/src/Epam.Kafka/Metrics/ProducerMetrics.cs index 1da3ce9..c758c1e 100644 --- a/src/Epam.Kafka/Metrics/ProducerMetrics.cs +++ b/src/Epam.Kafka/Metrics/ProducerMetrics.cs @@ -4,13 +4,13 @@ namespace Epam.Kafka.Metrics; -internal sealed class ProducerMetrics : TopLevelMetrics +internal sealed class ProducerMetrics : CommonMetrics { - protected override void Initialize(Meter meter) + protected override void Initialize(Meter meter, Meter topParMeter) { - base.Initialize(meter); + base.Initialize(meter, topParMeter); - this.CreateTopLevelCounter(meter, "epam_kafka_stats_txmsgs", v => v.TransmittedMessagesTotal, + this.CreateCounter(meter, "epam_kafka_stats_txmsgs", v => v.TransmittedMessagesTotal, description: "Total number of messages transmitted (produced) to Kafka brokers"); //this.CreateTopLevelCounter(meter, "epam_kafka_stats_tx", v => v.TransmittedRequestsTotal, diff --git a/src/Epam.Kafka/Metrics/StatisticsMetrics.cs b/src/Epam.Kafka/Metrics/StatisticsMetrics.cs index a29c561..a781f75 100644 --- a/src/Epam.Kafka/Metrics/StatisticsMetrics.cs +++ b/src/Epam.Kafka/Metrics/StatisticsMetrics.cs @@ -1,6 +1,8 @@ // Copyright © 2024 EPAM Systems using System.Diagnostics.Metrics; +using System.Text.RegularExpressions; +using Epam.Kafka.Stats; namespace Epam.Kafka.Metrics; @@ -8,62 +10,69 @@ namespace Epam.Kafka.Metrics; internal abstract class StatisticsMetrics : IObserver { + private static readonly Regex HandlerRegex = new ("^(.*)#(consumer|producer)-(\\d{1,7})$", + RegexOptions.Compiled | RegexOptions.IgnoreCase); + private const string NameTag = "Name"; private const string HandlerTag = "Handler"; - private const string InstanceTag = "Instance"; + private const string TypeTag = "Type"; + private const string TopicTagName = "Topic"; + private const string PartitionTagName = "Partition"; + private readonly object _syncObj = new(); private bool _initialized; - private readonly Meter _meter; + private Meter? _topLevelMeter; + private Meter? _topParMeter; protected Statistics? Value { get; private set; } protected static IEnumerable> Empty { get; } = Enumerable.Empty>(); - protected KeyValuePair[]? TopLevelTags { get; private set; } - - protected StatisticsMetrics(string meterName) - { - if (meterName == null) throw new ArgumentNullException(nameof(meterName)); - - this._meter = new Meter(meterName); - } - public void OnNext(Statistics value) { this.Value = value; - this.TopLevelTags ??= new[] - { - new KeyValuePair(NameTag, value.ClientId), - new KeyValuePair(HandlerTag, value.Name), - new KeyValuePair(InstanceTag, value.Type), - }; if (!this._initialized) { - lock (this._meter) + lock (this._syncObj) { if (!this._initialized) { - this.Initialize(this._meter); + Match match = HandlerRegex.Match(value.Name); + + string name = match.Success ? match.Result("$3") : value.Name; + + KeyValuePair[] topLevelTags = new[] + { + new KeyValuePair(NameTag, value.ClientId), + new KeyValuePair(HandlerTag, name), + new KeyValuePair(TypeTag, value.Type), + }; + + this._topLevelMeter = new Meter(Statistics.TopLevelMeterName, null, topLevelTags); + this._topParMeter = new Meter(Statistics.TopicPartitionMeterName, null, topLevelTags); + + this.Initialize(this._topLevelMeter, this._topParMeter); + this._initialized = true; } } } } - protected abstract void Initialize(Meter meter); + protected abstract void Initialize(Meter meter, Meter topParMeter); public void OnError(Exception error) { this.Value = null; - this.TopLevelTags = null; } public void OnCompleted() { - this._meter.Dispose(); + this._topLevelMeter?.Dispose(); + this._topParMeter?.Dispose(); } - protected void CreateTopLevelGauge(Meter meter, string name, Func factory) + protected void CreateGauge(Meter meter, string name, Func factory) { if (meter == null) throw new ArgumentNullException(nameof(meter)); if (name == null) throw new ArgumentNullException(nameof(name)); @@ -75,15 +84,14 @@ protected void CreateTopLevelGauge(Meter meter, string name, Func(factory(value), this.TopLevelTags), 1); + return Enumerable.Repeat(new Measurement(factory(value)), 1); } return Empty; }); } - protected void CreateTopLevelCounter(Meter meter, string name, Func factory, string? unit = null, string? description = null) + protected void CreateCounter(Meter meter, string name, Func factory, string? unit = null, string? description = null) { if (meter == null) throw new ArgumentNullException(nameof(meter)); if (name == null) throw new ArgumentNullException(nameof(name)); @@ -95,8 +103,34 @@ protected void CreateTopLevelCounter(Meter meter, string name, Func(factory(value), this.TopLevelTags), 1); + return Enumerable.Repeat(new Measurement(factory(value)), 1); + } + + return Empty; + }, unit, description); + } + + protected void CreateTpGauge(Meter meter, string name, Func,long> factory, string? unit = null, + string? description = null) + { + if (meter == null) throw new ArgumentNullException(nameof(meter)); + if (name == null) throw new ArgumentNullException(nameof(name)); + + meter.CreateObservableGauge(name, () => + { + Statistics? v = this.Value; + + if (v != null) + { + return v.Topics + .SelectMany(p => + p.Value.Partitions.Where(x => x.Key != PartitionStatistics.InternalUnassignedPartition) + .Select(x => new KeyValuePair(p.Value, x.Value))) + .Select(m => new Measurement(factory(m), new[] + { + new KeyValuePair(TopicTagName, m.Key.Name), + new KeyValuePair(PartitionTagName, m.Value.Id) + })); } return Empty; diff --git a/src/Epam.Kafka/Statistics.cs b/src/Epam.Kafka/Statistics.cs index c887526..dd21e31 100644 --- a/src/Epam.Kafka/Statistics.cs +++ b/src/Epam.Kafka/Statistics.cs @@ -16,7 +16,12 @@ public class Statistics /// /// Name of used to expose top level statistics. /// - public const string MeterName = "Epam.Kafka.Statistics"; + public const string TopLevelMeterName = "Epam.Kafka.Statistics"; + + /// + /// Name of used to expose top topic partition statistics. + /// + public const string TopicPartitionMeterName = "Epam.Kafka.Statistics.Toppar"; /// /// Create new instance of object from json representation. @@ -175,10 +180,14 @@ public static Statistics FromJson(string json) public Dictionary Topics { get; } = new(); /// - /// Consumer group metrics. See . + /// Consumer group metrics. /// [JsonPropertyName("cgrp")] - public GroupStatistics? ConsumerGroups { get; set; } + public GroupStatistics ConsumerGroup { get; } = new(); - //TODO: eos + /// + /// EOS / Idempotent producer state and metrics. + /// + [JsonPropertyName("eos")] + public GroupStatistics ProducerTransaction { get; } = new(); } \ No newline at end of file diff --git a/src/Epam.Kafka/Stats/BrokerStatistics.cs b/src/Epam.Kafka/Stats/BrokerStatistics.cs index 6c0640c..dd65c86 100644 --- a/src/Epam.Kafka/Stats/BrokerStatistics.cs +++ b/src/Epam.Kafka/Stats/BrokerStatistics.cs @@ -1,8 +1,9 @@ // Copyright © 2024 EPAM Systems -using System.Text.Json.Serialization; using Confluent.Kafka; +using System.Text.Json.Serialization; + namespace Epam.Kafka.Stats; /// diff --git a/src/Epam.Kafka/Stats/JsonContext.cs b/src/Epam.Kafka/Stats/JsonContext.cs index a8e6c89..e5c6842 100644 --- a/src/Epam.Kafka/Stats/JsonContext.cs +++ b/src/Epam.Kafka/Stats/JsonContext.cs @@ -1,11 +1,10 @@ // Copyright © 2024 EPAM Systems -using System.Text.Json; +using Confluent.Kafka; +using System.Text.Json; using System.Text.Json.Serialization; -using Confluent.Kafka; - namespace Epam.Kafka.Stats; [JsonSourceGenerationOptions(JsonSerializerDefaults.Web, diff --git a/src/Epam.Kafka/Stats/ParseStatsJsonObserver.cs b/src/Epam.Kafka/Stats/ParseStatsJsonObserver.cs index 18c3a39..29e59cf 100644 --- a/src/Epam.Kafka/Stats/ParseStatsJsonObserver.cs +++ b/src/Epam.Kafka/Stats/ParseStatsJsonObserver.cs @@ -1,15 +1,14 @@ // Copyright © 2024 EPAM Systems +using Epam.Kafka.Internals.Observable; + namespace Epam.Kafka.Stats; -internal sealed class ParseStatsJsonObserver : IObserver -{ - private readonly IObserver _inner; +#pragma warning disable CA1031 // notify other listeners even if one of them failed - public ParseStatsJsonObserver(IObserver inner) - { - this._inner = inner ?? throw new ArgumentNullException(nameof(inner)); - } +internal sealed class ParseStatsJsonObserver : IObserver, IObservable +{ + private readonly List> _inner = new(); public void OnNext(string value) { @@ -20,21 +19,65 @@ public void OnNext(string value) } catch (Exception e) { - this._inner.OnError(e); + this.OnError(e); throw; } - this._inner.OnNext(statistics); + foreach (IObserver observer in this._inner) + { + try + { + observer.OnNext(statistics); + } + catch + { + // notify other listeners even if one of them failed + } + } } public void OnError(Exception error) { - this._inner.OnError(error); + foreach (IObserver observer in this._inner) + { + try + { + observer.OnError(error); + } + catch + { + // notify other listeners even if one of them failed + } + } } public void OnCompleted() { - this._inner.OnCompleted(); + foreach (IObserver observer in this._inner) + { + try + { + observer.OnCompleted(); + } + catch + { + // notify other listeners even if one of them failed + } + } + + this._inner.Clear(); } -} \ No newline at end of file + + public IDisposable Subscribe(IObserver observer) + { + if (!this._inner.Contains(observer)) + { + this._inner.Add(observer); + } + + return new Unsubscriber(this._inner, observer); + } +} + +#pragma warning restore CA1031 \ No newline at end of file diff --git a/src/Epam.Kafka/Stats/PartitionStatistics.cs b/src/Epam.Kafka/Stats/PartitionStatistics.cs index f51c82b..33c0095 100644 --- a/src/Epam.Kafka/Stats/PartitionStatistics.cs +++ b/src/Epam.Kafka/Stats/PartitionStatistics.cs @@ -20,6 +20,30 @@ public class PartitionStatistics [JsonPropertyName("partition")] public long Id { get; set; } + /// + /// Partition is explicitly desired by application + /// + [JsonPropertyName("desired")] + public bool Desired { get; set; } + + /// + /// Partition not seen in topic metadata from broker + /// + [JsonPropertyName("unknown")] + public bool Unknown { get; set; } + + /// + /// Number of messages waiting to be produced in first-level queue + /// + [JsonPropertyName("msgq_cnt")] + public long QueueCount { get; set; } + + /// + /// Number of pre-fetched messages in fetch queue + /// + [JsonPropertyName("fetchq_cnt")] + public long FetchCount { get; set; } + /// /// Consumer fetch state for this partition (none, stopping, stopped, offset-query, offset-wait, active) /// diff --git a/src/Epam.Kafka/Stats/TopicStatistics.cs b/src/Epam.Kafka/Stats/TopicStatistics.cs index e9d38b6..df6d341 100644 --- a/src/Epam.Kafka/Stats/TopicStatistics.cs +++ b/src/Epam.Kafka/Stats/TopicStatistics.cs @@ -27,6 +27,18 @@ public class TopicStatistics [JsonPropertyName("metadata_age")] public long MetadataAgeMilliseconds { get; set; } + /// + /// Batch sizes in bytes + /// + [JsonPropertyName("batchsize")] + public WindowStatistics BatchSize { get; } = new(); + + /// + /// Batch message counts + /// + [JsonPropertyName("batchcnt")] + public WindowStatistics BatchCount { get; } = new(); + /// /// Partitions dict, key is partition id, value is /// diff --git a/src/Epam.Kafka/Stats/TransactionStatistics.cs b/src/Epam.Kafka/Stats/TransactionStatistics.cs new file mode 100644 index 0000000..4ff7075 --- /dev/null +++ b/src/Epam.Kafka/Stats/TransactionStatistics.cs @@ -0,0 +1,41 @@ +// Copyright © 2024 EPAM Systems + +using System.Text.Json.Serialization; + +namespace Epam.Kafka.Stats; + +/// +/// EOS / Idempotent producer state and metrics +/// +public class TransactionStatistics +{ + /// + /// Current idempotent producer id state. + /// + [JsonPropertyName("idemp_state")] + public string IdempotentState { get; set; } = string.Empty; + + /// + /// Time elapsed since last change (milliseconds). + /// + [JsonPropertyName("idemp_stateage")] + public long IdempotentAgeMilliseconds { get; set; } + + /// + /// Current transactional producer state. + /// + [JsonPropertyName("txn_state")] + public string TransactionState { get; set; } = string.Empty; + + /// + /// Time elapsed since last change (milliseconds). + /// + [JsonPropertyName("txn_stateage")] + public long TransactionAgeMilliseconds { get; set; } + + /// + /// Transactional state allows enqueuing (producing) new messages. + /// + [JsonPropertyName("txn_may_enq")] + public bool EnqAllowed { get; set; } +} \ No newline at end of file diff --git a/src/Epam.Kafka/Stats/WindowStats.cs b/src/Epam.Kafka/Stats/WindowStats.cs new file mode 100644 index 0000000..be05a3f --- /dev/null +++ b/src/Epam.Kafka/Stats/WindowStats.cs @@ -0,0 +1,41 @@ +// Copyright © 2024 EPAM Systems + +using System.Text.Json.Serialization; + +namespace Epam.Kafka.Stats; + +/// +/// Rolling window statistics. The values are in microseconds unless otherwise stated. +/// +public class WindowStatistics +{ + /// + /// Smallest value + /// + [JsonPropertyName("min")] + public long Min { get; set; } + + /// + /// Largest value + /// + [JsonPropertyName("max")] + public long Max { get; set; } + + /// + /// Average value + /// + [JsonPropertyName("avg")] + public long Avg { get; set; } + + /// + /// Sum of values + /// + [JsonPropertyName("sum")] + public long Sum { get; set; } + + /// + /// Number of values sampled + /// + [JsonPropertyName("cnt")] + public long Count { get; set; } +} \ No newline at end of file diff --git a/tests/Epam.Kafka.Tests/Common/MeterHelper.cs b/tests/Epam.Kafka.Tests/Common/MeterHelper.cs new file mode 100644 index 0000000..fe39c87 --- /dev/null +++ b/tests/Epam.Kafka.Tests/Common/MeterHelper.cs @@ -0,0 +1,66 @@ +// Copyright © 2024 EPAM Systems + +using System.Diagnostics.Metrics; +using Xunit.Abstractions; + +namespace Epam.Kafka.Tests.Common; + +public sealed class MeterHelper : IDisposable +{ + private readonly MeterListener _listener = new (); + + public IDictionary Results { get; } = new Dictionary(); + + public MeterHelper(string meterName) + { + this._listener.InstrumentPublished = (instrument, listener) => { listener.EnableMeasurementEvents(instrument); }; + + this._listener.SetMeasurementEventCallback((instrument, measurement, tags, _) => + { + if (instrument.Meter.Name != meterName) + { + return; + } + + KeyValuePair[] t = tags.ToArray(); + + if (instrument.Meter.Tags != null) + { + t = instrument.Meter.Tags.Concat(t).ToArray(); + } + + string ts = string.Join("-", t.Select(x => $"{x.Key}:{x.Value}")); + + string key = $"{instrument.Name}_{ts}"; + + this.Results[key] = measurement; + }); + + this._listener.Start(); + } + + public void RecordObservableInstruments(ITestOutputHelper? output = null) + { + this._listener.RecordObservableInstruments(); + + if (output != null) + { + this.Print(output); + } + } + + public void Print(ITestOutputHelper output) + { + if (output == null) throw new ArgumentNullException(nameof(output)); + + output.WriteLine(this.Results.Count.ToString("D")); + foreach (var kvp in this.Results) + { + output.WriteLine($"{kvp.Key}: {kvp.Value}"); + } + } + public void Dispose() + { + this._listener.Dispose(); + } +} \ No newline at end of file diff --git a/tests/Epam.Kafka.Tests/Data/ConsumerStat.json b/tests/Epam.Kafka.Tests/Data/ConsumerStat.json index 69d91b8..4e8b398 100644 --- a/tests/Epam.Kafka.Tests/Data/ConsumerStat.json +++ b/tests/Epam.Kafka.Tests/Data/ConsumerStat.json @@ -938,36 +938,36 @@ "age": 23753, "metadata_age": 35918, "batchsize": { - "min": 0, - "max": 0, - "avg": 0, - "sum": 0, - "stddev": 0, - "p50": 0, - "p75": 0, - "p90": 0, - "p95": 0, - "p99": 0, - "p99_99": 0, + "min": 99, + "max": 391805, + "avg": 272593, + "sum": 18808985, + "stddev": 180408, + "p50": 393215, + "p75": 393215, + "p90": 393215, + "p95": 393215, + "p99": 393215, + "p99_99": 393215, "outofrange": 0, "hdrsize": 14448, - "cnt": 0 + "cnt": 69 }, "batchcnt": { - "min": 0, - "max": 0, - "avg": 0, - "sum": 0, - "stddev": 0, - "p50": 0, - "p75": 0, - "p90": 0, - "p95": 0, - "p99": 0, - "p99_99": 0, + "min": 1, + "max": 10000, + "avg": 6956, + "sum": 480028, + "stddev": 4608, + "p50": 10047, + "p75": 10047, + "p90": 10047, + "p95": 10047, + "p99": 10047, + "p99_99": 10047, "outofrange": 0, "hdrsize": 8304, - "cnt": 0 + "cnt": 69 }, "partitions": { "0": { diff --git a/tests/Epam.Kafka.Tests/Epam.Kafka.approved.txt b/tests/Epam.Kafka.Tests/Epam.Kafka.approved.txt index 161eb4d..15cc4f7 100644 --- a/tests/Epam.Kafka.Tests/Epam.Kafka.approved.txt +++ b/tests/Epam.Kafka.Tests/Epam.Kafka.approved.txt @@ -60,7 +60,8 @@ namespace Epam.Kafka } public class Statistics { - public const string MeterName = "Epam.Kafka.Statistics"; + public const string TopLevelMeterName = "Epam.Kafka.Statistics"; + public const string TopicPartitionMeterName = "Epam.Kafka.Statistics.Toppar"; public Statistics() { } [System.Text.Json.Serialization.JsonPropertyName("age")] public long AgeMicroseconds { get; set; } @@ -77,7 +78,7 @@ namespace Epam.Kafka [System.Text.Json.Serialization.JsonPropertyName("rx")] public long ConsumedRequestsTotal { get; set; } [System.Text.Json.Serialization.JsonPropertyName("cgrp")] - public Epam.Kafka.Stats.GroupStatistics? ConsumerGroups { get; set; } + public Epam.Kafka.Stats.GroupStatistics ConsumerGroup { get; } [System.Text.Json.Serialization.JsonPropertyName("name")] public string Name { get; set; } [System.Text.Json.Serialization.JsonPropertyName("replyq")] @@ -90,6 +91,8 @@ namespace Epam.Kafka public long ProducerQueueSizeGauge { get; set; } [System.Text.Json.Serialization.JsonPropertyName("msg_size_max")] public long ProducerQueueSizeMax { get; set; } + [System.Text.Json.Serialization.JsonPropertyName("eos")] + public Epam.Kafka.Stats.GroupStatistics ProducerTransaction { get; } [System.Text.Json.Serialization.JsonIgnore] public string RawJson { get; } [System.Text.Json.Serialization.JsonPropertyName("time")] @@ -185,6 +188,10 @@ namespace Epam.Kafka.Stats public long CommittedOffset { get; set; } [System.Text.Json.Serialization.JsonPropertyName("consumer_lag")] public long ConsumerLag { get; set; } + [System.Text.Json.Serialization.JsonPropertyName("desired")] + public bool Desired { get; set; } + [System.Text.Json.Serialization.JsonPropertyName("fetchq_cnt")] + public long FetchCount { get; set; } [System.Text.Json.Serialization.JsonPropertyName("fetch_state")] public string FetchState { get; set; } [System.Text.Json.Serialization.JsonPropertyName("hi_offset")] @@ -197,12 +204,20 @@ namespace Epam.Kafka.Stats public long LsOffset { get; set; } [System.Text.Json.Serialization.JsonPropertyName("next_offset")] public long NextOffset { get; set; } + [System.Text.Json.Serialization.JsonPropertyName("msgq_cnt")] + public long QueueCount { get; set; } + [System.Text.Json.Serialization.JsonPropertyName("unknown")] + public bool Unknown { get; set; } } public class TopicStatistics { public TopicStatistics() { } [System.Text.Json.Serialization.JsonPropertyName("age")] public long AgeMilliseconds { get; set; } + [System.Text.Json.Serialization.JsonPropertyName("batchcnt")] + public Epam.Kafka.Stats.WindowStatistics BatchCount { get; } + [System.Text.Json.Serialization.JsonPropertyName("batchsize")] + public Epam.Kafka.Stats.WindowStatistics BatchSize { get; } [System.Text.Json.Serialization.JsonPropertyName("metadata_age")] public long MetadataAgeMilliseconds { get; set; } [System.Text.Json.Serialization.JsonPropertyName("topic")] @@ -210,4 +225,32 @@ namespace Epam.Kafka.Stats [System.Text.Json.Serialization.JsonPropertyName("partitions")] public System.Collections.Generic.Dictionary Partitions { get; } } + public class TransactionStatistics + { + public TransactionStatistics() { } + [System.Text.Json.Serialization.JsonPropertyName("txn_may_enq")] + public bool EnqAllowed { get; set; } + [System.Text.Json.Serialization.JsonPropertyName("idemp_stateage")] + public long IdempotentAgeMilliseconds { get; set; } + [System.Text.Json.Serialization.JsonPropertyName("idemp_state")] + public string IdempotentState { get; set; } + [System.Text.Json.Serialization.JsonPropertyName("txn_stateage")] + public long TransactionAgeMilliseconds { get; set; } + [System.Text.Json.Serialization.JsonPropertyName("txn_state")] + public string TransactionState { get; set; } + } + public class WindowStatistics + { + public WindowStatistics() { } + [System.Text.Json.Serialization.JsonPropertyName("avg")] + public long Avg { get; set; } + [System.Text.Json.Serialization.JsonPropertyName("cnt")] + public long Count { get; set; } + [System.Text.Json.Serialization.JsonPropertyName("max")] + public long Max { get; set; } + [System.Text.Json.Serialization.JsonPropertyName("min")] + public long Min { get; set; } + [System.Text.Json.Serialization.JsonPropertyName("sum")] + public long Sum { get; set; } + } } diff --git a/tests/Epam.Kafka.Tests/KafkaFactoryTests.cs b/tests/Epam.Kafka.Tests/KafkaFactoryTests.cs index 8539860..3b6e7b2 100644 --- a/tests/Epam.Kafka.Tests/KafkaFactoryTests.cs +++ b/tests/Epam.Kafka.Tests/KafkaFactoryTests.cs @@ -11,8 +11,6 @@ using Shouldly; -using System.Diagnostics.Metrics; - using Xunit; using Xunit.Abstractions; @@ -153,7 +151,7 @@ public void CreateOauthConsumerCustom() configure: b => b.SetOAuthBearerTokenRefreshHandler( (_, _) => { invoked = true; })); - + Assert.NotNull(consumer); consumer.Consume(1000); @@ -339,6 +337,7 @@ public void ObservableConsumerErrors() var errorObs = new Mock>(); var statsObs = new Mock>(); + var parsedObs = new Mock>(); Assert.Throws(() => consumer.ShouldBeAssignableTo>()!.Subscribe(errorObs.Object)) @@ -346,6 +345,44 @@ public void ObservableConsumerErrors() Assert.Throws(() => consumer.ShouldBeAssignableTo>()!.Subscribe(statsObs.Object)) .Message.ShouldContain("Cannot subscribe to statistics because handler was explicitly set"); + + consumer.Dispose(); + + Assert.Throws(() => consumer.ShouldBeAssignableTo>()!.Subscribe(errorObs.Object)); + Assert.Throws(() => consumer.ShouldBeAssignableTo>()!.Subscribe(statsObs.Object)); + Assert.Throws(() => consumer.ShouldBeAssignableTo>()!.Subscribe(parsedObs.Object)); + + List tp = new List { new(string.Empty, 0) }; + List tpo = new List { new(tp[0], 0) }; + + Assert.Throws(() => consumer.Subscription); + Assert.Throws(() => consumer.ConsumerGroupMetadata); + Assert.Throws(() => consumer.Assignment); + Assert.Throws(() => consumer.MemberId); + Assert.Throws(() => consumer.Assign(tp[0])); + Assert.Throws(() => consumer.Assign(tpo[0])); + Assert.Throws(() => consumer.Assign(tp)); + Assert.Throws(() => consumer.Assign(tpo)); + Assert.Throws(() => consumer.Close()); + Assert.Throws(() => consumer.Commit()); + Assert.Throws(() => consumer.Commit(tpo)); + Assert.Throws(() => consumer.Commit(new ConsumeResult())); + Assert.Throws(() => consumer.Committed(TimeSpan.Zero)); + Assert.Throws(() => consumer.Committed(tp, TimeSpan.Zero)); + Assert.Throws(() => consumer.Consume()); + Assert.Throws(() => consumer.Consume(TimeSpan.Zero)); + Assert.Throws(() => consumer.Consume(0)); + Assert.Throws(() => consumer.GetWatermarkOffsets(tp[0])); + Assert.Throws(() => consumer.IncrementalAssign(tp)); + Assert.Throws(() => consumer.IncrementalAssign(tpo)); + Assert.Throws(() => consumer.IncrementalUnassign(tp)); + Assert.Throws(() => consumer.OffsetsForTimes(null, TimeSpan.Zero)); + Assert.Throws(() => consumer.Pause(tp)); + Assert.Throws(() => consumer.Position(tp[0])); + Assert.Throws(() => consumer.Resume(tp)); + Assert.Throws(() => consumer.Seek(tpo[0])); + Assert.Throws(() => consumer.StoreOffset(tpo[0])); + Assert.Throws(() => consumer.StoreOffset(new ConsumeResult())); } [Fact] @@ -368,6 +405,7 @@ public void ObservableProducerErrors() var errorObs = new Mock>(); var statsObs = new Mock>(); + var parsedObs = new Mock>(); Assert.Throws(() => producer.ShouldBeAssignableTo>()!.Subscribe(errorObs.Object)) @@ -376,6 +414,30 @@ public void ObservableProducerErrors() Assert.Throws(() => producer.ShouldBeAssignableTo>()!.Subscribe(statsObs.Object)) .Message.ShouldContain("Cannot subscribe to statistics because handler was explicitly set"); + + producer.Dispose(); + + Assert.Throws(() => producer.ShouldBeAssignableTo>()!.Subscribe(errorObs.Object)); + Assert.Throws(() => producer.ShouldBeAssignableTo>()!.Subscribe(statsObs.Object)); + Assert.Throws(() => producer.ShouldBeAssignableTo>()!.Subscribe(parsedObs.Object)); + + Assert.Throws(() => producer.Name); + Assert.Throws(() => producer.Handle); + Assert.Throws(() => producer.Poll(TimeSpan.Zero)); + Assert.Throws(() => producer.Produce(string.Empty, null)); + Assert.Throws(() => producer.Produce(new TopicPartition(string.Empty, 0), null)); + Assert.Throws(() => producer.AbortTransaction()); + Assert.Throws(() => producer.AbortTransaction(TimeSpan.Zero)); + Assert.Throws(() => producer.CommitTransaction()); + Assert.Throws(() => producer.CommitTransaction(TimeSpan.Zero)); + Assert.Throws(() => producer.BeginTransaction()); + Assert.Throws(() => producer.Flush()); + Assert.Throws(() => producer.Flush(TimeSpan.Zero)); + Assert.Throws(() => producer.SendOffsetsToTransaction(null!, null!, TimeSpan.Zero)); + Assert.Throws(() => producer.AddBrokers(null)); + Assert.Throws(() => producer.SetSaslCredentials(null, null)); + Assert.Throws(() => producer.OAuthBearerSetToken(null, 0, null)); + Assert.Throws(() => producer.OAuthBearerSetTokenFailure(null)); } [Theory] @@ -437,37 +499,22 @@ public void CreateDefaultClientWithMetrics() MockCluster.AddMockCluster(this).WithClusterConfig(MockCluster.ClusterName) .Configure(x => x.ClientConfig.StatisticsIntervalMs = 100); - MeterListener ml = new MeterListener(); - - ml.InstrumentPublished = (instrument, listener) => { listener.EnableMeasurementEvents(instrument); }; - - Dictionary results = new(); - - ml.SetMeasurementEventCallback((instrument, measurement, tags, _) => - { - string ts = string.Join("-", tags.ToArray().Select(x => $"{x.Key}:{x.Value}")); - - string key = $"{instrument.Name}_{ts}"; - - results[key] = measurement; - }); - - ml.Start(); + using MeterHelper ml = new(Statistics.TopLevelMeterName); ml.RecordObservableInstruments(); - ml.RecordObservableInstruments(); - results.Count.ShouldBe(0); + ml.Results.Count.ShouldBe(0); using IClient c1 = this.KafkaFactory.GetOrCreateClient(); Assert.NotNull(c1); Task.Delay(200).Wait(); - ml.RecordObservableInstruments(); + ml.RecordObservableInstruments(this.Output); - foreach (var kvp in results) - { - this.Output.WriteLine($"{kvp.Key}: {kvp.Value}"); - } + ml.Results.Count.ShouldBe(2); + + Task.Delay(1000).Wait(); + + ml.RecordObservableInstruments(this.Output); - results.Count.ShouldBe(2); + ml.Results.Count.ShouldBe(2); } [Fact] diff --git a/tests/Epam.Kafka.Tests/StatisticsTests.cs b/tests/Epam.Kafka.Tests/StatisticsTests.cs index 2557fc6..8002031 100644 --- a/tests/Epam.Kafka.Tests/StatisticsTests.cs +++ b/tests/Epam.Kafka.Tests/StatisticsTests.cs @@ -1,19 +1,27 @@ // Copyright © 2024 EPAM Systems -using System.Diagnostics.Metrics; - using Confluent.Kafka; + using Epam.Kafka.Metrics; using Epam.Kafka.Stats; +using Epam.Kafka.Tests.Common; using Shouldly; using Xunit; +using Xunit.Abstractions; namespace Epam.Kafka.Tests; public class StatisticsTests { + public ITestOutputHelper Output { get; } + + public StatisticsTests(ITestOutputHelper output) + { + this.Output = output ?? throw new ArgumentNullException(nameof(output)); + } + [Fact] public void ParseErrors() { @@ -63,6 +71,8 @@ public void ParseConsumerOk() topic.Name.ShouldBe("epam-kafka-sample-topic-2"); topic.AgeMilliseconds.ShouldBe(23753); topic.MetadataAgeMilliseconds.ShouldBe(35918); + topic.BatchSize.Sum.ShouldBe(18808985); + topic.BatchCount.Sum.ShouldBe(480028); topic.Partitions.ShouldNotBeNull().Count.ShouldBe(2); PartitionStatistics partition = topic.Partitions[0]; @@ -74,7 +84,7 @@ public void ParseConsumerOk() partition.ConsumerLag.ShouldBe(1); partition.FetchState.ShouldBe("active"); - GroupStatistics group = value.ConsumerGroups.ShouldNotBeNull(); + GroupStatistics group = value.ConsumerGroup.ShouldNotBeNull(); group.State.ShouldBe("up"); group.StateAgeMilliseconds.ShouldBe(39225); group.JoinState.ShouldBe("steady"); @@ -85,22 +95,7 @@ public void ParseConsumerOk() [Fact] public void TopLevelMetricsTests() { - MeterListener ml = new MeterListener(); - - ml.InstrumentPublished = (instrument, listener) => { listener.EnableMeasurementEvents(instrument); }; - - Dictionary results = new(); - - ml.SetMeasurementEventCallback((instrument, measurement, tags, _) => - { - string ts = string.Join("-", tags.ToArray().Select(x => $"{x.Key}:{x.Value}")); - - string key = $"{instrument.Name}_{ts}"; - - results[key] = measurement; - }); - - ml.Start(); + using MeterHelper ml = new(Statistics.TopLevelMeterName); ConsumerMetrics cm = new(); ProducerMetrics pm = new(); @@ -108,29 +103,63 @@ public void TopLevelMetricsTests() cm.OnNext(new Statistics { ClientId = "c1", Name = "n1", Type = "c", ConsumedMessagesTotal = 123 }); pm.OnNext(new Statistics { ClientId = "c1", Name = "n2", Type = "p", TransmittedMessagesTotal = 111 }); - ml.RecordObservableInstruments(); - results.Count.ShouldBe(4); - results["epam_kafka_stats_rxmsgs_Name:c1-Handler:n1-Instance:c"].ShouldBe(123); - results["epam_kafka_stats_txmsgs_Name:c1-Handler:n2-Instance:p"].ShouldBe(111); - results["epam_kafka_stats_age_Name:c1-Handler:n1-Instance:c"].ShouldBe(0); - results["epam_kafka_stats_age_Name:c1-Handler:n2-Instance:p"].ShouldBe(0); + ml.RecordObservableInstruments(this.Output); + + ml.Results.Count.ShouldBe(4); + ml.Results["epam_kafka_stats_rxmsgs_Handler:n1-Name:c1-Type:c"].ShouldBe(123); + ml.Results["epam_kafka_stats_txmsgs_Handler:n2-Name:c1-Type:p"].ShouldBe(111); + ml.Results["epam_kafka_stats_age_Handler:n1-Name:c1-Type:c"].ShouldBe(0); + ml.Results["epam_kafka_stats_age_Handler:n2-Name:c1-Type:p"].ShouldBe(0); cm.OnCompleted(); cm.OnNext(new Statistics { ClientId = "c1", Name = "n1", ConsumedMessagesTotal = 124 }); - pm.OnNext(new Statistics { ClientId = "p1", Name = "n1", TransmittedMessagesTotal = 112, AgeMicroseconds = 555}); + pm.OnNext(new Statistics { ClientId = "p1", Name = "n1", TransmittedMessagesTotal = 112, AgeMicroseconds = 555 }); - results.Clear(); - ml.RecordObservableInstruments(); + ml.Results.Clear(); + ml.RecordObservableInstruments(this.Output); - results.Count.ShouldBe(2); - results["epam_kafka_stats_txmsgs_Name:c1-Handler:n2-Instance:p"].ShouldBe(112); - results["epam_kafka_stats_age_Name:c1-Handler:n2-Instance:p"].ShouldBe(555); + ml.Results.Count.ShouldBe(2); + ml.Results["epam_kafka_stats_txmsgs_Handler:n2-Name:c1-Type:p"].ShouldBe(112); + ml.Results["epam_kafka_stats_age_Handler:n2-Name:c1-Type:p"].ShouldBe(555); pm.OnCompleted(); - results.Clear(); - ml.RecordObservableInstruments(); - results.Count.ShouldBe(0); + ml.Results.Clear(); + ml.RecordObservableInstruments(this.Output); + ml.Results.Count.ShouldBe(0); + } + + [Fact] + public void TopParMetricsTests() + { + using MeterHelper ml = new(Statistics.TopicPartitionMeterName); + + ConsumerMetrics cm = new(); + + Statistics statistics = new Statistics { ClientId = "c1", Name = "n1", Type = "c", ConsumedMessagesTotal = 123 }; + TopicStatistics ts = new TopicStatistics { Name = "t1" }; + PartitionStatistics ps = new PartitionStatistics { Id = 2, ConsumerLag = 445 }; + + ts.Partitions.Add(ps.Id, ps); + + statistics.Topics.Add(ts.Name, ts); + + cm.OnNext(statistics); + + ml.RecordObservableInstruments(this.Output); + + ml.Results.Count.ShouldBe(1); + ml.Results["epam_kafka_stats_tp_lag_Handler:n1-Name:c1-Type:c-Topic:t1-Partition:2"].ShouldBe(445); + + statistics.Topics.Clear(); + + cm.OnNext(statistics); + + ml.Results.Clear(); + ml.RecordObservableInstruments(this.Output); + ml.Results.Count.ShouldBe(0); + + cm.OnCompleted(); } } \ No newline at end of file