From 3089362d92819b2b16853f4c01abbd9fac1008fc Mon Sep 17 00:00:00 2001 From: Ihar Yakimush Date: Fri, 31 Jan 2025 16:38:04 +0300 Subject: [PATCH] producer transaction metrics --- src/Epam.Kafka/Internals/KafkaFactory.cs | 2 +- .../Observable/ObservableProducer.cs | 4 +- src/Epam.Kafka/Metrics/CommonMetrics.cs | 2 +- src/Epam.Kafka/Metrics/ConsumerMetrics.cs | 52 ++++++---------- src/Epam.Kafka/Metrics/ProducerMetrics.cs | 37 +++++++++++ src/Epam.Kafka/Metrics/StatisticsMetrics.cs | 7 ++- src/Epam.Kafka/Statistics.cs | 9 ++- .../Epam.Kafka.Tests/Epam.Kafka.approved.txt | 3 +- tests/Epam.Kafka.Tests/MetricsTests.cs | 62 +++++++++++++++++-- tests/Epam.Kafka.Tests/StatisticsTests.cs | 24 ++++++- 10 files changed, 155 insertions(+), 47 deletions(-) diff --git a/src/Epam.Kafka/Internals/KafkaFactory.cs b/src/Epam.Kafka/Internals/KafkaFactory.cs index c1c9b70..0f30081 100644 --- a/src/Epam.Kafka/Internals/KafkaFactory.cs +++ b/src/Epam.Kafka/Internals/KafkaFactory.cs @@ -226,7 +226,7 @@ public IProducer CreateProducer(ProducerConfig confi try { - producer = new(builder); + producer = new(builder, config); logger.ProducerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet); } diff --git a/src/Epam.Kafka/Internals/Observable/ObservableProducer.cs b/src/Epam.Kafka/Internals/Observable/ObservableProducer.cs index 6f1f2cd..7854f0a 100644 --- a/src/Epam.Kafka/Internals/Observable/ObservableProducer.cs +++ b/src/Epam.Kafka/Internals/Observable/ObservableProducer.cs @@ -10,7 +10,7 @@ internal class ObservableProducer : ObservableClient, IProducer _inner; - public ObservableProducer(ProducerBuilder builder) + public ObservableProducer(ProducerBuilder builder, ProducerConfig config) { if (builder == null) throw new ArgumentNullException(nameof(builder)); @@ -30,7 +30,7 @@ public ObservableProducer(ProducerBuilder builder) this.StatObservers = new List>(); #pragma warning disable CA2000 // unsubscribe not needed - this.Subscribe(new ProducerMetrics()); + this.Subscribe(new ProducerMetrics(config)); #pragma warning restore CA2000 } catch (InvalidOperationException) diff --git a/src/Epam.Kafka/Metrics/CommonMetrics.cs b/src/Epam.Kafka/Metrics/CommonMetrics.cs index 42fe262..37f8e4a 100644 --- a/src/Epam.Kafka/Metrics/CommonMetrics.cs +++ b/src/Epam.Kafka/Metrics/CommonMetrics.cs @@ -6,7 +6,7 @@ namespace Epam.Kafka.Metrics; internal abstract class CommonMetrics : StatisticsMetrics { - protected override void Initialize(Meter meter, Meter topParMeter) + protected override void Initialize(Meter meter, Meter topParMeter, Meter transactionMeter) { this.CreateCounter(meter, "epam_kafka_stats_trx_msgs", this.GetTxRxMsg, description: "Number of messages consumed or produced."); diff --git a/src/Epam.Kafka/Metrics/ConsumerMetrics.cs b/src/Epam.Kafka/Metrics/ConsumerMetrics.cs index 0c2b921..4acd5b6 100644 --- a/src/Epam.Kafka/Metrics/ConsumerMetrics.cs +++ b/src/Epam.Kafka/Metrics/ConsumerMetrics.cs @@ -23,38 +23,11 @@ public ConsumerMetrics(ConsumerConfig config) this._config = config ?? throw new ArgumentNullException(nameof(config)); } - protected override void Initialize(Meter meter, Meter topParMeter) + protected override void Initialize(Meter meter, Meter topParMeter, Meter transactionMeter) { - base.Initialize(meter, topParMeter); + base.Initialize(meter, topParMeter, transactionMeter); - this.CreateTpGauge(topParMeter, "epam_kafka_stats_tp_lag", - m => m.Value.ConsumerLag, null, "Consumer lag"); - } - - protected override long GetTxRxMsg(Statistics value) - { - return value.ConsumedMessagesTotal; - } - - protected override long GetTxRx(Statistics value) - { - return value.ConsumedRequestsTotal; - } - - protected override long GetTxRxBytes(Statistics value) - { - return value.ConsumedBytesTotal; - } - - private void CreateTpGauge(Meter meter, string name, - Func, long> factory, - string? unit = null, - string? description = null) - { - if (meter == null) throw new ArgumentNullException(nameof(meter)); - if (name == null) throw new ArgumentNullException(nameof(name)); - - meter.CreateObservableGauge(name, () => + topParMeter.CreateObservableGauge("epam_kafka_stats_tp_lag", () => { Statistics? v = this.Value; @@ -64,7 +37,7 @@ private void CreateTpGauge(Meter meter, string name, .SelectMany(p => p.Value.Partitions.Where(x => x.Key != PartitionStatistics.InternalUnassignedPartition) .Select(x => new KeyValuePair(p.Value, x.Value))) - .Select(m => new Measurement(factory(m), new[] + .Select(m => new Measurement(m.Value.ConsumerLag, new[] { new KeyValuePair(DesiredTagName, m.Value.Desired), new KeyValuePair(FetchTagName, m.Value.FetchState), @@ -75,6 +48,21 @@ private void CreateTpGauge(Meter meter, string name, } return Empty; - }, unit, description); + }, null, "Consumer lag"); + } + + protected override long GetTxRxMsg(Statistics value) + { + return value.ConsumedMessagesTotal; + } + + protected override long GetTxRx(Statistics value) + { + return value.ConsumedRequestsTotal; + } + + protected override long GetTxRxBytes(Statistics value) + { + return value.ConsumedBytesTotal; } } \ No newline at end of file diff --git a/src/Epam.Kafka/Metrics/ProducerMetrics.cs b/src/Epam.Kafka/Metrics/ProducerMetrics.cs index 93f93cc..22e979e 100644 --- a/src/Epam.Kafka/Metrics/ProducerMetrics.cs +++ b/src/Epam.Kafka/Metrics/ProducerMetrics.cs @@ -1,9 +1,46 @@ // Copyright © 2024 EPAM Systems +using System.Diagnostics.Metrics; + +using Confluent.Kafka; + namespace Epam.Kafka.Metrics; internal sealed class ProducerMetrics : CommonMetrics { + private const string EnqueueTagName = "Enqueue"; + private const string TransactionTagName = "Transaction"; + private const string StateTagName = "State"; + + private readonly ProducerConfig _config; + + public ProducerMetrics(ProducerConfig config) + { + this._config = config ?? throw new ArgumentNullException(nameof(config)); + } + + protected override void Initialize(Meter meter, Meter topParMeter, Meter transactionMeter) + { + base.Initialize(meter, topParMeter, transactionMeter); + + transactionMeter.CreateObservableGauge("epam_kafka_stats_eos_txn_age", () => + { + Statistics? v = this.Value; + + if (v != null && !string.IsNullOrWhiteSpace(this._config.TransactionalId)) + { + return Enumerable.Repeat(new Measurement(v.ProducerTransaction.TransactionAgeMilliseconds / 1000, + new[] + { + new KeyValuePair(EnqueueTagName, v.ProducerTransaction.EnqAllowed), + new KeyValuePair(StateTagName, v.ProducerTransaction.TransactionState), + new KeyValuePair(TransactionTagName, this._config.TransactionalId) + }), 1); + } + + return Empty; + }, "seconds", "Transactional producer state age seconds"); + } protected override long GetTxRxMsg(Statistics value) { diff --git a/src/Epam.Kafka/Metrics/StatisticsMetrics.cs b/src/Epam.Kafka/Metrics/StatisticsMetrics.cs index 28318ce..1031058 100644 --- a/src/Epam.Kafka/Metrics/StatisticsMetrics.cs +++ b/src/Epam.Kafka/Metrics/StatisticsMetrics.cs @@ -20,6 +20,7 @@ internal abstract class StatisticsMetrics : IObserver private bool _initialized; private Meter? _topLevelMeter; private Meter? _topParMeter; + private Meter? _transactionMeter; protected Statistics? Value { get; private set; } protected static IEnumerable> Empty { get; } = Enumerable.Empty>(); @@ -47,8 +48,9 @@ public void OnNext(Statistics value) this._topLevelMeter = new Meter(Statistics.TopLevelMeterName, null, topLevelTags); this._topParMeter = new Meter(Statistics.TopicPartitionMeterName, null, topLevelTags); + this._transactionMeter = new Meter(Statistics.TransactionMeterName, null, topLevelTags); - this.Initialize(this._topLevelMeter, this._topParMeter); + this.Initialize(this._topLevelMeter, this._topParMeter, this._transactionMeter); this._initialized = true; } @@ -56,7 +58,7 @@ public void OnNext(Statistics value) } } - protected abstract void Initialize(Meter meter, Meter topParMeter); + protected abstract void Initialize(Meter meter, Meter topParMeter, Meter transactionMeter); public void OnError(Exception error) { @@ -67,6 +69,7 @@ public void OnCompleted() { this._topLevelMeter?.Dispose(); this._topParMeter?.Dispose(); + this._transactionMeter?.Dispose(); } protected void CreateGauge(Meter meter, string name, Func factory, string? unit = null, string? description = null) diff --git a/src/Epam.Kafka/Statistics.cs b/src/Epam.Kafka/Statistics.cs index 52288fd..6049b2f 100644 --- a/src/Epam.Kafka/Statistics.cs +++ b/src/Epam.Kafka/Statistics.cs @@ -19,10 +19,15 @@ public class Statistics public const string TopLevelMeterName = "Epam.Kafka.Statistics"; /// - /// Name of used to expose top topic partition statistics. + /// Name of used to expose topic partition statistics. /// public const string TopicPartitionMeterName = "Epam.Kafka.Statistics.TopicPartition"; + /// + /// Name of used to expose producer transaction statistics. + /// + public const string TransactionMeterName = "Epam.Kafka.Statistics.Transaction"; + /// /// Create new instance of object from json representation. /// @@ -189,5 +194,5 @@ public static Statistics FromJson(string json) /// EOS / Idempotent producer state and metrics. /// [JsonPropertyName("eos")] - public GroupStatistics ProducerTransaction { get; } = new(); + public TransactionStatistics ProducerTransaction { get; } = new(); } \ No newline at end of file diff --git a/tests/Epam.Kafka.Tests/Epam.Kafka.approved.txt b/tests/Epam.Kafka.Tests/Epam.Kafka.approved.txt index 80281ea..2439882 100644 --- a/tests/Epam.Kafka.Tests/Epam.Kafka.approved.txt +++ b/tests/Epam.Kafka.Tests/Epam.Kafka.approved.txt @@ -62,6 +62,7 @@ namespace Epam.Kafka { public const string TopLevelMeterName = "Epam.Kafka.Statistics"; public const string TopicPartitionMeterName = "Epam.Kafka.Statistics.TopicPartition"; + public const string TransactionMeterName = "Epam.Kafka.Statistics.Transaction"; public Statistics() { } [System.Text.Json.Serialization.JsonPropertyName("age")] public long AgeMicroseconds { get; set; } @@ -92,7 +93,7 @@ namespace Epam.Kafka [System.Text.Json.Serialization.JsonPropertyName("msg_size_max")] public long ProducerQueueSizeMax { get; set; } [System.Text.Json.Serialization.JsonPropertyName("eos")] - public Epam.Kafka.Stats.GroupStatistics ProducerTransaction { get; } + public Epam.Kafka.Stats.TransactionStatistics ProducerTransaction { get; } [System.Text.Json.Serialization.JsonIgnore] public string RawJson { get; } [System.Text.Json.Serialization.JsonPropertyName("time")] diff --git a/tests/Epam.Kafka.Tests/MetricsTests.cs b/tests/Epam.Kafka.Tests/MetricsTests.cs index db089b8..5c14d60 100644 --- a/tests/Epam.Kafka.Tests/MetricsTests.cs +++ b/tests/Epam.Kafka.Tests/MetricsTests.cs @@ -20,7 +20,7 @@ public MetricsTests(ITestOutputHelper output) : base(output) } [Fact] - public void CreateDefaultClientWithMetrics() + public async Task CreateDefaultClientWithMetrics() { MockCluster.AddMockCluster(this).WithClusterConfig(MockCluster.ClusterName) .Configure(x => x.ClientConfig.StatisticsIntervalMs = 100); @@ -31,12 +31,12 @@ public void CreateDefaultClientWithMetrics() using IClient c1 = this.KafkaFactory.GetOrCreateClient(); Assert.NotNull(c1); - Task.Delay(200).Wait(); + await Task.Delay(200); ml.RecordObservableInstruments(this.Output); ml.Results.Count.ShouldBe(4); - Task.Delay(1000).Wait(); + await Task.Delay(1000); ml.RecordObservableInstruments(this.Output); @@ -44,7 +44,7 @@ public void CreateDefaultClientWithMetrics() } [Fact] - public void ConsumerTopParMetricsAssign() + public async Task ConsumerTopParMetricsAssign() { this.Services.AddKafka(false).WithTestMockCluster(MockCluster.ClusterName); @@ -62,7 +62,7 @@ public void ConsumerTopParMetricsAssign() }, MockCluster.ClusterName); // No assigned topic partitions - Task.Delay(200).Wait(); + await Task.Delay(200); ml.RecordObservableInstruments(); ml.Results.Count.ShouldBe(0); @@ -80,4 +80,56 @@ public void ConsumerTopParMetricsAssign() ml.RecordObservableInstruments(); ml.Results.Count.ShouldBe(0); } + + [Fact] + public async Task ProducerTransaction() + { + this.Services.AddKafka(false).WithTestMockCluster(MockCluster.ClusterName); + + using MeterHelper ml = new(Statistics.TransactionMeterName); + ml.RecordObservableInstruments(); + ml.Results.Count.ShouldBe(0); + + using IProducer producer = + this.KafkaFactory.CreateProducer(new ProducerConfig + { + TransactionalId = "qwe", + StatisticsIntervalMs = 100 + }, MockCluster.ClusterName); + + await Task.Delay(200); + ml.RecordObservableInstruments(this.Output); + ml.Results.Count.ShouldBe(1); + ml.Results.Keys.Single().ShouldContain("Type:producer-Enqueue:False-State:Init-Transaction:qwe"); + + // One 1 of 4 assigned + producer.InitTransactions(TimeSpan.FromSeconds(3)); + + await Task.Delay(200); + ml.RecordObservableInstruments(this.Output); + ml.Results.Count.ShouldBe(1); + ml.Results.Keys.Single().ShouldContain("Type:producer-Enqueue:False-State:Ready-Transaction:qwe"); + + producer.BeginTransaction(); + + await Task.Delay(200); + ml.RecordObservableInstruments(this.Output); + ml.Results.Count.ShouldBe(1); + ml.Results.Keys.Single().ShouldContain("Type:producer-Enqueue:True-State:InTransaction-Transaction:qwe"); + + await producer.ProduceAsync("test", new Message { Key = 1, Value = 2 }); + ml.RecordObservableInstruments(this.Output); + + await Task.Delay(200); + ml.RecordObservableInstruments(this.Output); + ml.Results.Count.ShouldBe(1); + //ml.Results.Keys.Single().ShouldContain("Type:producer-Enqueue:False-State:Init-Transaction:qwe"); + + producer.CommitTransaction(); + + await Task.Delay(200); + ml.RecordObservableInstruments(this.Output); + ml.Results.Count.ShouldBe(1); + //ml.Results.Keys.Single().ShouldContain("Type:producer-Enqueue:False-State:Init-Transaction:qwe"); + } } \ No newline at end of file diff --git a/tests/Epam.Kafka.Tests/StatisticsTests.cs b/tests/Epam.Kafka.Tests/StatisticsTests.cs index 1619860..f2b1a04 100644 --- a/tests/Epam.Kafka.Tests/StatisticsTests.cs +++ b/tests/Epam.Kafka.Tests/StatisticsTests.cs @@ -98,7 +98,7 @@ public void TopLevelMetricsTests() using MeterHelper ml = new(Statistics.TopLevelMeterName); ConsumerMetrics cm = new(new ConsumerConfig()); - ProducerMetrics pm = new(); + ProducerMetrics pm = new(new ProducerConfig()); cm.OnNext(new Statistics { ClientId = "c1", Name = "n1", Type = "c", ConsumedMessagesTotal = 123, OpsQueueCountGauge = 332 }); pm.OnNext(new Statistics { ClientId = "c1", Name = "n2", Type = "p", TransmittedMessagesTotal = 111 }); @@ -162,4 +162,26 @@ public void TopParMetricsTests() cm.OnCompleted(); } + + [Fact] + public void TransactionMetricsTests() + { + using MeterHelper ml = new(Statistics.TransactionMeterName); + + ProducerMetrics cm = new(new ProducerConfig { TransactionalId = "qwe" }); + + Statistics statistics = new Statistics { ClientId = "c1", Name = "n1", Type = "c", ConsumedMessagesTotal = 123 }; + statistics.ProducerTransaction.EnqAllowed = true; + statistics.ProducerTransaction.TransactionState = "test"; + statistics.ProducerTransaction.TransactionAgeMilliseconds = 120000; + + cm.OnNext(statistics); + + ml.RecordObservableInstruments(this.Output); + + ml.Results.Count.ShouldBe(1); + ml.Results["epam_kafka_stats_eos_txn_age_Handler:n1-Name:c1-Type:c-Enqueue:True-State:test-Transaction:qwe"].ShouldBe(120); + + cm.OnCompleted(); + } } \ No newline at end of file