diff --git a/src/Epam.Kafka/Metrics/ConsumerMetrics.cs b/src/Epam.Kafka/Metrics/ConsumerMetrics.cs index e1149e8..0c2b921 100644 --- a/src/Epam.Kafka/Metrics/ConsumerMetrics.cs +++ b/src/Epam.Kafka/Metrics/ConsumerMetrics.cs @@ -3,12 +3,15 @@ using System.Diagnostics.Metrics; using Confluent.Kafka; + using Epam.Kafka.Stats; namespace Epam.Kafka.Metrics; internal sealed class ConsumerMetrics : CommonMetrics { + private const string DesiredTagName = "Desired"; + private const string FetchTagName = "Fetch"; private const string TopicTagName = "Topic"; private const string PartitionTagName = "Partition"; private const string ConsumerGroupTagName = "Group"; @@ -63,6 +66,8 @@ private void CreateTpGauge(Meter meter, string name, .Select(x => new KeyValuePair(p.Value, x.Value))) .Select(m => new Measurement(factory(m), new[] { + new KeyValuePair(DesiredTagName, m.Value.Desired), + new KeyValuePair(FetchTagName, m.Value.FetchState), new KeyValuePair(TopicTagName, m.Key.Name), new KeyValuePair(PartitionTagName, m.Value.Id), new KeyValuePair(ConsumerGroupTagName, this._config.GroupId) diff --git a/tests/Epam.Kafka.Tests/Common/MeterHelper.cs b/tests/Epam.Kafka.Tests/Common/MeterHelper.cs index fe39c87..862a3df 100644 --- a/tests/Epam.Kafka.Tests/Common/MeterHelper.cs +++ b/tests/Epam.Kafka.Tests/Common/MeterHelper.cs @@ -41,6 +41,8 @@ public MeterHelper(string meterName) public void RecordObservableInstruments(ITestOutputHelper? output = null) { + this.Results.Clear(); + this._listener.RecordObservableInstruments(); if (output != null) diff --git a/tests/Epam.Kafka.Tests/KafkaFactoryTests.cs b/tests/Epam.Kafka.Tests/KafkaFactoryTests.cs index c41958e..8db1b6f 100644 --- a/tests/Epam.Kafka.Tests/KafkaFactoryTests.cs +++ b/tests/Epam.Kafka.Tests/KafkaFactoryTests.cs @@ -493,30 +493,6 @@ public void CreateDefaultClients() Assert.NotNull(dc); } - [Fact] - public void CreateDefaultClientWithMetrics() - { - MockCluster.AddMockCluster(this).WithClusterConfig(MockCluster.ClusterName) - .Configure(x => x.ClientConfig.StatisticsIntervalMs = 100); - - using MeterHelper ml = new(Statistics.TopLevelMeterName); - ml.RecordObservableInstruments(); - ml.Results.Count.ShouldBe(0); - - using IClient c1 = this.KafkaFactory.GetOrCreateClient(); - Assert.NotNull(c1); - Task.Delay(200).Wait(); - ml.RecordObservableInstruments(this.Output); - - ml.Results.Count.ShouldBe(4); - - Task.Delay(1000).Wait(); - - ml.RecordObservableInstruments(this.Output); - - ml.Results.Count.ShouldBe(4); - } - [Fact] public void CreateDefaultClientsError() { diff --git a/tests/Epam.Kafka.Tests/MetricsTests.cs b/tests/Epam.Kafka.Tests/MetricsTests.cs new file mode 100644 index 0000000..f3e99a8 --- /dev/null +++ b/tests/Epam.Kafka.Tests/MetricsTests.cs @@ -0,0 +1,166 @@ +// Copyright © 2024 EPAM Systems + +using Confluent.Kafka; + +using Epam.Kafka.Tests.Common; + +using Microsoft.Extensions.DependencyInjection; + +using Shouldly; + +using Xunit; +using Xunit.Abstractions; + +namespace Epam.Kafka.Tests; + +public class MetricsTests : TestWithServices +{ + public MetricsTests(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public void CreateDefaultClientWithMetrics() + { + MockCluster.AddMockCluster(this).WithClusterConfig(MockCluster.ClusterName) + .Configure(x => x.ClientConfig.StatisticsIntervalMs = 100); + + using MeterHelper ml = new(Statistics.TopLevelMeterName); + ml.RecordObservableInstruments(); + ml.Results.Count.ShouldBe(0); + + using IClient c1 = this.KafkaFactory.GetOrCreateClient(); + Assert.NotNull(c1); + Task.Delay(200).Wait(); + ml.RecordObservableInstruments(this.Output); + + ml.Results.Count.ShouldBe(4); + + Task.Delay(1000).Wait(); + + ml.RecordObservableInstruments(this.Output); + + ml.Results.Count.ShouldBe(4); + } + + [Fact] + public void ConsumerTopParMetricsAssign() + { + this.Services.AddKafka(false).WithTestMockCluster(MockCluster.ClusterName); + + using MeterHelper ml = new(Statistics.TopicPartitionMeterName); + ml.RecordObservableInstruments(); + ml.Results.Count.ShouldBe(0); + + this.ServiceProvider.GetRequiredKeyedService(MockCluster.ClusterName).SeedTopic("test1"); + + using IConsumer consumer = + this.KafkaFactory.CreateConsumer(new ConsumerConfig + { + GroupId = "qwe", + StatisticsIntervalMs = 100 + }, MockCluster.ClusterName); + + // No assigned topic partitions + Task.Delay(200).Wait(); + ml.RecordObservableInstruments(); + ml.Results.Count.ShouldBe(0); + + // One 1 of 4 assigned + consumer.Assign(new TopicPartition("test1", 1)); + consumer.Consume(200); + ml.RecordObservableInstruments(this.Output); + ml.Results.Count.ShouldBe(1); + ml.Results.Keys.Single().ShouldContain("Fetch:offset-query"); + ml.Results.Keys.Single().ShouldContain("Desired:true"); + + // No assigned topic partitions + consumer.Unassign(); + consumer.Consume(200); + ml.RecordObservableInstruments(); + ml.Results.Count.ShouldBe(0); + } + + [Theory] + [InlineData(PartitionAssignmentStrategy.CooperativeSticky)] + [InlineData(PartitionAssignmentStrategy.RoundRobin)] + public void ConsumerTopParMetricsSubscribe(PartitionAssignmentStrategy strategy) + { + this.Services.AddKafka(false).WithTestMockCluster(MockCluster.ClusterName); + + using MeterHelper ml = new(Statistics.TopicPartitionMeterName); + ml.RecordObservableInstruments(); + ml.Results.Count.ShouldBe(0); + + this.ServiceProvider.GetRequiredKeyedService(MockCluster.ClusterName).SeedTopic("test1"); + + using IConsumer c1 = + this.KafkaFactory.CreateConsumer(new ConsumerConfig + { + PartitionAssignmentStrategy = strategy, + ClientId = "c1", + GroupId = "qwe", + StatisticsIntervalMs = 100, + SessionTimeoutMs = 5000, + MaxPollIntervalMs = 7000 + }, + MockCluster.ClusterName, + b => + { + b.SetPartitionsAssignedHandler((c, list) => + this.Output.WriteLine($"{c.Name} assigned {list.Count}: {string.Join(",", list)}")); + + b.SetPartitionsRevokedHandler((c, list) => + this.Output.WriteLine($"{c.Name} revoked {list.Count}")); + }); + + using IConsumer c2 = + this.KafkaFactory.CreateConsumer(new ConsumerConfig + { + PartitionAssignmentStrategy = strategy, + ClientId = "c2", + GroupId = "qwe", + SessionTimeoutMs = 5000, + MaxPollIntervalMs = 7000 + }, MockCluster.ClusterName, + b => + { + b.SetPartitionsAssignedHandler((c, list) => + this.Output.WriteLine($"{c.Name} assigned {list.Count}: {string.Join(",", list)}")); + + b.SetPartitionsRevokedHandler((c, list) => + this.Output.WriteLine($"{c.Name} revoked {list.Count}")); + }); + + void ConsumeLoop(int count, params IConsumer[] consumers) + { + for (int i = 0; i < count; i++) + { + foreach (var c in consumers) + { + c.Consume(100); + } + } + } + + // No assigned topic partitions + Task.Delay(200).Wait(); + ml.RecordObservableInstruments(); + ml.Results.Count.ShouldBe(0); + + // One 1 of 4 assigned + c1.Subscribe("test1"); + ConsumeLoop(30, c1); + ml.RecordObservableInstruments(this.Output); + ml.Results.Count.ShouldBe(4); + ml.Results.Count(x => x.Key.Contains("Desired:True")).ShouldBe(4); + + // No assigned topic partitions + c2.Subscribe("test1"); + ConsumeLoop(80, c1, c2); + + ml.RecordObservableInstruments(this.Output); + ml.Results.Count.ShouldBe(4); + ml.Results.Count(x => x.Key.Contains("Desired:True")).ShouldBe(2); + } +} \ No newline at end of file diff --git a/tests/Epam.Kafka.Tests/StatisticsTests.cs b/tests/Epam.Kafka.Tests/StatisticsTests.cs index ae4420c..1619860 100644 --- a/tests/Epam.Kafka.Tests/StatisticsTests.cs +++ b/tests/Epam.Kafka.Tests/StatisticsTests.cs @@ -100,7 +100,7 @@ public void TopLevelMetricsTests() ConsumerMetrics cm = new(new ConsumerConfig()); ProducerMetrics pm = new(); - cm.OnNext(new Statistics { ClientId = "c1", Name = "n1", Type = "c", ConsumedMessagesTotal = 123, OpsQueueCountGauge = 332}); + cm.OnNext(new Statistics { ClientId = "c1", Name = "n1", Type = "c", ConsumedMessagesTotal = 123, OpsQueueCountGauge = 332 }); pm.OnNext(new Statistics { ClientId = "c1", Name = "n2", Type = "p", TransmittedMessagesTotal = 111 }); ml.RecordObservableInstruments(this.Output); @@ -139,7 +139,7 @@ public void TopParMetricsTests() Statistics statistics = new Statistics { ClientId = "c1", Name = "n1", Type = "c", ConsumedMessagesTotal = 123 }; TopicStatistics ts = new TopicStatistics { Name = "t1" }; - PartitionStatistics ps = new PartitionStatistics { Id = 2, ConsumerLag = 445 }; + PartitionStatistics ps = new PartitionStatistics { Id = 2, ConsumerLag = 445, Desired = true, FetchState = "active" }; ts.Partitions.Add(ps.Id, ps); @@ -150,7 +150,7 @@ public void TopParMetricsTests() ml.RecordObservableInstruments(this.Output); ml.Results.Count.ShouldBe(1); - ml.Results["epam_kafka_stats_tp_lag_Handler:n1-Name:c1-Type:c-Topic:t1-Partition:2-Group:qwe"].ShouldBe(445); + ml.Results["epam_kafka_stats_tp_lag_Handler:n1-Name:c1-Type:c-Desired:True-Fetch:active-Topic:t1-Partition:2-Group:qwe"].ShouldBe(445); statistics.Topics.Clear();