Skip to content

Commit

Permalink
add desired tag to top par metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
IharYakimush committed Jan 31, 2025
1 parent 8a86787 commit 1989688
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 27 deletions.
5 changes: 5 additions & 0 deletions src/Epam.Kafka/Metrics/ConsumerMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
using System.Diagnostics.Metrics;

using Confluent.Kafka;

using Epam.Kafka.Stats;

namespace Epam.Kafka.Metrics;

internal sealed class ConsumerMetrics : CommonMetrics
{
private const string DesiredTagName = "Desired";
private const string FetchTagName = "Fetch";
private const string TopicTagName = "Topic";
private const string PartitionTagName = "Partition";
private const string ConsumerGroupTagName = "Group";
Expand Down Expand Up @@ -63,6 +66,8 @@ private void CreateTpGauge(Meter meter, string name,
.Select(x => new KeyValuePair<TopicStatistics, PartitionStatistics>(p.Value, x.Value)))
.Select(m => new Measurement<long>(factory(m), new[]
{
new KeyValuePair<string, object?>(DesiredTagName, m.Value.Desired),
new KeyValuePair<string, object?>(FetchTagName, m.Value.FetchState),
new KeyValuePair<string, object?>(TopicTagName, m.Key.Name),
new KeyValuePair<string, object?>(PartitionTagName, m.Value.Id),
new KeyValuePair<string, object?>(ConsumerGroupTagName, this._config.GroupId)
Expand Down
2 changes: 2 additions & 0 deletions tests/Epam.Kafka.Tests/Common/MeterHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public MeterHelper(string meterName)

public void RecordObservableInstruments(ITestOutputHelper? output = null)
{
this.Results.Clear();

this._listener.RecordObservableInstruments();

if (output != null)
Expand Down
24 changes: 0 additions & 24 deletions tests/Epam.Kafka.Tests/KafkaFactoryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -493,30 +493,6 @@ public void CreateDefaultClients()
Assert.NotNull(dc);
}

[Fact]
public void CreateDefaultClientWithMetrics()
{
MockCluster.AddMockCluster(this).WithClusterConfig(MockCluster.ClusterName)
.Configure(x => x.ClientConfig.StatisticsIntervalMs = 100);

using MeterHelper ml = new(Statistics.TopLevelMeterName);
ml.RecordObservableInstruments();
ml.Results.Count.ShouldBe(0);

using IClient c1 = this.KafkaFactory.GetOrCreateClient();
Assert.NotNull(c1);
Task.Delay(200).Wait();
ml.RecordObservableInstruments(this.Output);

ml.Results.Count.ShouldBe(4);

Task.Delay(1000).Wait();

ml.RecordObservableInstruments(this.Output);

ml.Results.Count.ShouldBe(4);
}

[Fact]
public void CreateDefaultClientsError()
{
Expand Down
166 changes: 166 additions & 0 deletions tests/Epam.Kafka.Tests/MetricsTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright © 2024 EPAM Systems

using Confluent.Kafka;

using Epam.Kafka.Tests.Common;

using Microsoft.Extensions.DependencyInjection;

using Shouldly;

using Xunit;
using Xunit.Abstractions;

namespace Epam.Kafka.Tests;

public class MetricsTests : TestWithServices
{
public MetricsTests(ITestOutputHelper output) : base(output)
{
}

[Fact]
public void CreateDefaultClientWithMetrics()
{
MockCluster.AddMockCluster(this).WithClusterConfig(MockCluster.ClusterName)
.Configure(x => x.ClientConfig.StatisticsIntervalMs = 100);

using MeterHelper ml = new(Statistics.TopLevelMeterName);
ml.RecordObservableInstruments();
ml.Results.Count.ShouldBe(0);

using IClient c1 = this.KafkaFactory.GetOrCreateClient();
Assert.NotNull(c1);
Task.Delay(200).Wait();

Check warning on line 34 in tests/Epam.Kafka.Tests/MetricsTests.cs

View workflow job for this annotation

GitHub Actions / build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)

Check warning on line 34 in tests/Epam.Kafka.Tests/MetricsTests.cs

View workflow job for this annotation

GitHub Actions / build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)

Check warning on line 34 in tests/Epam.Kafka.Tests/MetricsTests.cs

View workflow job for this annotation

GitHub Actions / build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)
ml.RecordObservableInstruments(this.Output);

ml.Results.Count.ShouldBe(4);

Task.Delay(1000).Wait();

Check warning on line 39 in tests/Epam.Kafka.Tests/MetricsTests.cs

View workflow job for this annotation

GitHub Actions / build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)

Check warning on line 39 in tests/Epam.Kafka.Tests/MetricsTests.cs

View workflow job for this annotation

GitHub Actions / build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)

Check warning on line 39 in tests/Epam.Kafka.Tests/MetricsTests.cs

View workflow job for this annotation

GitHub Actions / build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)

ml.RecordObservableInstruments(this.Output);

ml.Results.Count.ShouldBe(4);
}

[Fact]
public void ConsumerTopParMetricsAssign()
{
this.Services.AddKafka(false).WithTestMockCluster(MockCluster.ClusterName);

using MeterHelper ml = new(Statistics.TopicPartitionMeterName);
ml.RecordObservableInstruments();
ml.Results.Count.ShouldBe(0);

this.ServiceProvider.GetRequiredKeyedService<TestMockCluster>(MockCluster.ClusterName).SeedTopic("test1");

using IConsumer<Ignore, Ignore> consumer =
this.KafkaFactory.CreateConsumer<Ignore, Ignore>(new ConsumerConfig
{
GroupId = "qwe",
StatisticsIntervalMs = 100
}, MockCluster.ClusterName);

// No assigned topic partitions
Task.Delay(200).Wait();

Check warning on line 65 in tests/Epam.Kafka.Tests/MetricsTests.cs

View workflow job for this annotation

GitHub Actions / build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)

Check warning on line 65 in tests/Epam.Kafka.Tests/MetricsTests.cs

View workflow job for this annotation

GitHub Actions / build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)
ml.RecordObservableInstruments();
ml.Results.Count.ShouldBe(0);

// One 1 of 4 assigned
consumer.Assign(new TopicPartition("test1", 1));
consumer.Consume(200);
ml.RecordObservableInstruments(this.Output);
ml.Results.Count.ShouldBe(1);
ml.Results.Keys.Single().ShouldContain("Fetch:offset-query");
ml.Results.Keys.Single().ShouldContain("Desired:true");

// No assigned topic partitions
consumer.Unassign();
consumer.Consume(200);
ml.RecordObservableInstruments();
ml.Results.Count.ShouldBe(0);
}

[Theory]
[InlineData(PartitionAssignmentStrategy.CooperativeSticky)]
[InlineData(PartitionAssignmentStrategy.RoundRobin)]
public void ConsumerTopParMetricsSubscribe(PartitionAssignmentStrategy strategy)
{
this.Services.AddKafka(false).WithTestMockCluster(MockCluster.ClusterName);

using MeterHelper ml = new(Statistics.TopicPartitionMeterName);
ml.RecordObservableInstruments();
ml.Results.Count.ShouldBe(0);

this.ServiceProvider.GetRequiredKeyedService<TestMockCluster>(MockCluster.ClusterName).SeedTopic("test1");

using IConsumer<Ignore, Ignore> c1 =
this.KafkaFactory.CreateConsumer<Ignore, Ignore>(new ConsumerConfig
{
PartitionAssignmentStrategy = strategy,
ClientId = "c1",
GroupId = "qwe",
StatisticsIntervalMs = 100,
SessionTimeoutMs = 5000,
MaxPollIntervalMs = 7000
},
MockCluster.ClusterName,
b =>
{
b.SetPartitionsAssignedHandler((c, list) =>
this.Output.WriteLine($"{c.Name} assigned {list.Count}: {string.Join(",", list)}"));

b.SetPartitionsRevokedHandler((c, list) =>
this.Output.WriteLine($"{c.Name} revoked {list.Count}"));
});

using IConsumer<Ignore, Ignore> c2 =
this.KafkaFactory.CreateConsumer<Ignore, Ignore>(new ConsumerConfig
{
PartitionAssignmentStrategy = strategy,
ClientId = "c2",
GroupId = "qwe",
SessionTimeoutMs = 5000,
MaxPollIntervalMs = 7000
}, MockCluster.ClusterName,
b =>
{
b.SetPartitionsAssignedHandler((c, list) =>
this.Output.WriteLine($"{c.Name} assigned {list.Count}: {string.Join(",", list)}"));

b.SetPartitionsRevokedHandler((c, list) =>
this.Output.WriteLine($"{c.Name} revoked {list.Count}"));
});

void ConsumeLoop(int count, params IConsumer<Ignore, Ignore>[] consumers)
{
for (int i = 0; i < count; i++)
{
foreach (var c in consumers)
{
c.Consume(100);
}
}
}

// No assigned topic partitions
Task.Delay(200).Wait();

Check warning on line 147 in tests/Epam.Kafka.Tests/MetricsTests.cs

View workflow job for this annotation

GitHub Actions / build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)

Check warning on line 147 in tests/Epam.Kafka.Tests/MetricsTests.cs

View workflow job for this annotation

GitHub Actions / build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)
ml.RecordObservableInstruments();
ml.Results.Count.ShouldBe(0);

// One 1 of 4 assigned
c1.Subscribe("test1");
ConsumeLoop(30, c1);
ml.RecordObservableInstruments(this.Output);
ml.Results.Count.ShouldBe(4);
ml.Results.Count(x => x.Key.Contains("Desired:True")).ShouldBe(4);

// No assigned topic partitions
c2.Subscribe("test1");
ConsumeLoop(80, c1, c2);

ml.RecordObservableInstruments(this.Output);
ml.Results.Count.ShouldBe(4);
ml.Results.Count(x => x.Key.Contains("Desired:True")).ShouldBe(2);
}
}
6 changes: 3 additions & 3 deletions tests/Epam.Kafka.Tests/StatisticsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void TopLevelMetricsTests()
ConsumerMetrics cm = new(new ConsumerConfig());
ProducerMetrics pm = new();

cm.OnNext(new Statistics { ClientId = "c1", Name = "n1", Type = "c", ConsumedMessagesTotal = 123, OpsQueueCountGauge = 332});
cm.OnNext(new Statistics { ClientId = "c1", Name = "n1", Type = "c", ConsumedMessagesTotal = 123, OpsQueueCountGauge = 332 });
pm.OnNext(new Statistics { ClientId = "c1", Name = "n2", Type = "p", TransmittedMessagesTotal = 111 });

ml.RecordObservableInstruments(this.Output);
Expand Down Expand Up @@ -139,7 +139,7 @@ public void TopParMetricsTests()

Statistics statistics = new Statistics { ClientId = "c1", Name = "n1", Type = "c", ConsumedMessagesTotal = 123 };
TopicStatistics ts = new TopicStatistics { Name = "t1" };
PartitionStatistics ps = new PartitionStatistics { Id = 2, ConsumerLag = 445 };
PartitionStatistics ps = new PartitionStatistics { Id = 2, ConsumerLag = 445, Desired = true, FetchState = "active" };

ts.Partitions.Add(ps.Id, ps);

Expand All @@ -150,7 +150,7 @@ public void TopParMetricsTests()
ml.RecordObservableInstruments(this.Output);

ml.Results.Count.ShouldBe(1);
ml.Results["epam_kafka_stats_tp_lag_Handler:n1-Name:c1-Type:c-Topic:t1-Partition:2-Group:qwe"].ShouldBe(445);
ml.Results["epam_kafka_stats_tp_lag_Handler:n1-Name:c1-Type:c-Desired:True-Fetch:active-Topic:t1-Partition:2-Group:qwe"].ShouldBe(445);

statistics.Topics.Clear();

Expand Down

0 comments on commit 1989688

Please sign in to comment.