Skip to content

Commit

Permalink
update metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
IharYakimush committed Feb 3, 2025
1 parent 3089362 commit 0d26c19
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 68 deletions.
3 changes: 3 additions & 0 deletions Epam.Kafka.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/UserDictionary/Words/=Epam/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Gapless/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=idemp/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Kerberos/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=librdkafka/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=OAUTHBEARER/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Partitioner/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=rebalance/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=rebalances/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Sasl/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Unsubscriber/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
6 changes: 5 additions & 1 deletion src/Epam.Kafka/Metrics/CommonMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ namespace Epam.Kafka.Metrics;

internal abstract class CommonMetrics : StatisticsMetrics
{
protected override void Initialize(Meter meter, Meter topParMeter, Meter transactionMeter)
protected override IEnumerable<Meter> Initialize(KeyValuePair<string, object?>[] topLevelTags)
{
Meter meter = new(Statistics.TopLevelMeterName, null, topLevelTags);

this.CreateCounter(meter, "epam_kafka_stats_trx_msgs", this.GetTxRxMsg,
description: "Number of messages consumed or produced.");

Expand All @@ -19,6 +21,8 @@ protected override void Initialize(Meter meter, Meter topParMeter, Meter transac

this.CreateGauge(meter, "epam_kafka_stats_age", v => v.AgeMicroseconds / 1000000, "seconds",
"Time since this client instance was created (seconds).");

yield return meter;
}

protected abstract long GetTxRxMsg(Statistics value);
Expand Down
101 changes: 93 additions & 8 deletions src/Epam.Kafka/Metrics/ConsumerMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace Epam.Kafka.Metrics;

internal sealed class ConsumerMetrics : CommonMetrics
{
private const string DesiredTagName = "Desired";
private const string FetchTagName = "Fetch";
private const string StateTagName = "State";
private const string ReasonTagName = "Reason";
private const string TopicTagName = "Topic";
private const string PartitionTagName = "Partition";
private const string ConsumerGroupTagName = "Group";
Expand All @@ -23,10 +23,96 @@ public ConsumerMetrics(ConsumerConfig config)
this._config = config ?? throw new ArgumentNullException(nameof(config));
}

protected override void Initialize(Meter meter, Meter topParMeter, Meter transactionMeter)
protected override IEnumerable<Meter> Initialize(KeyValuePair<string, object?>[] topLevelTags)
{
base.Initialize(meter, topParMeter, transactionMeter);
foreach (Meter m in base.Initialize(topLevelTags))
{
yield return m;
}

KeyValuePair<string, object?>[] tags = topLevelTags.Concat(new[]
{
new KeyValuePair<string, object?>(ConsumerGroupTagName, this._config.GroupId)
}).ToArray();

Meter cgMeter = new(Statistics.ConsumerGroupMeterName, null, tags);

this.ConfigureCgMeter(cgMeter);

yield return cgMeter;

Meter topParMeter = new(Statistics.TopicPartitionMeterName, null, tags);

this.ConfigureTopParMeter(topParMeter);

yield return topParMeter;
}

private void ConfigureCgMeter(Meter cgMeter)
{
cgMeter.CreateObservableGauge("epam_kafka_stats_cg_state_age", () =>
{
Statistics? v = this.Value;

if (v != null)
{
return Enumerable.Repeat(new Measurement<long>(v.ConsumerGroup.StateAgeMilliseconds / 1000,
new[]
{
new KeyValuePair<string, object?>(StateTagName, v.ConsumerGroup.State)
}), 1);
}

return Empty;
}, "seconds", "Consumer group handler state age seconds");

cgMeter.CreateObservableGauge("epam_kafka_stats_cg_rebalance_age", () =>
{
Statistics? v = this.Value;

if (v != null)
{
return Enumerable.Repeat(new Measurement<long>(v.ConsumerGroup.RebalanceAgeMilliseconds / 1000,
new[]
{
new KeyValuePair<string, object?>(ReasonTagName, v.ConsumerGroup.RebalanceReason)
}), 1);
}

return Empty;
}, "seconds", "Time elapsed since last rebalance seconds");

cgMeter.CreateObservableCounter("epam_kafka_stats_cg_rebalance_count", () =>
{
Statistics? v = this.Value;

if (v != null)
{
return Enumerable.Repeat(new Measurement<long>(v.ConsumerGroup.RebalanceCount), 1);
}

return Empty;
}, null, "Total number of rebalances");

cgMeter.CreateObservableGauge("epam_kafka_stats_cg_assignment_count", () =>
{
Statistics? v = this.Value;

if (v != null)
{
return Enumerable.Repeat(new Measurement<long>(v.ConsumerGroup.AssignmentCount,
new[]
{
new KeyValuePair<string, object?>(StateTagName, v.ConsumerGroup.JoinState)
}), 1);
}

return Empty;
}, null, "Current assignment's partition count");
}

private void ConfigureTopParMeter(Meter topParMeter)
{
topParMeter.CreateObservableGauge("epam_kafka_stats_tp_lag", () =>
{
Statistics? v = this.Value;
Expand All @@ -35,15 +121,14 @@ protected override void Initialize(Meter meter, Meter topParMeter, Meter transac
{
return v.Topics
.SelectMany(p =>
p.Value.Partitions.Where(x => x.Key != PartitionStatistics.InternalUnassignedPartition)
p.Value.Partitions.Where(x =>
x.Key != PartitionStatistics.InternalUnassignedPartition &&
x.Value is { Desired: true, ConsumerLag: >= 0 })
.Select(x => new KeyValuePair<TopicStatistics, PartitionStatistics>(p.Value, x.Value)))
.Select(m => new Measurement<long>(m.Value.ConsumerLag, new[]
{
new KeyValuePair<string, object?>(DesiredTagName, m.Value.Desired),
new KeyValuePair<string, object?>(FetchTagName, m.Value.FetchState),
new KeyValuePair<string, object?>(TopicTagName, m.Key.Name),
new KeyValuePair<string, object?>(PartitionTagName, m.Value.Id),
new KeyValuePair<string, object?>(ConsumerGroupTagName, this._config.GroupId)
}));
}

Expand Down
45 changes: 35 additions & 10 deletions src/Epam.Kafka/Metrics/ProducerMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace Epam.Kafka.Metrics;

internal sealed class ProducerMetrics : CommonMetrics
{
private const string EnqueueTagName = "Enqueue";
private const string TransactionTagName = "Transaction";
private const string StateTagName = "State";

Expand All @@ -19,29 +18,55 @@ public ProducerMetrics(ProducerConfig config)
this._config = config ?? throw new ArgumentNullException(nameof(config));
}

protected override void Initialize(Meter meter, Meter topParMeter, Meter transactionMeter)
#pragma warning disable CA2000 // Meter will be disposed in base class
protected override IEnumerable<Meter> Initialize(KeyValuePair<string, object?>[] topLevelTags)
{
base.Initialize(meter, topParMeter, transactionMeter);
foreach (Meter m in base.Initialize(topLevelTags))
{
yield return m;
}

Meter meter = new(Statistics.TransactionMeterName, null, topLevelTags);

transactionMeter.CreateObservableGauge("epam_kafka_stats_eos_txn_age", () =>
if (!string.IsNullOrWhiteSpace(this._config.TransactionalId))
{
meter.CreateObservableGauge("epam_kafka_stats_eos_txn_age", () =>
{
Statistics? v = this.Value;

if (v != null)
{
return Enumerable.Repeat(new Measurement<long>(v.ProducerTransaction.TransactionAgeMilliseconds / 1000,
new[]
{
new KeyValuePair<string, object?>(StateTagName, v.ProducerTransaction.TransactionState),
new KeyValuePair<string, object?>(TransactionTagName, this._config.TransactionalId)
}), 1);
}

return Empty;
}, "seconds", "Transaction state age seconds");
}

meter.CreateObservableGauge("epam_kafka_stats_eos_idemp_age", () =>
{
Statistics? v = this.Value;

if (v != null && !string.IsNullOrWhiteSpace(this._config.TransactionalId))
{
return Enumerable.Repeat(new Measurement<long>(v.ProducerTransaction.TransactionAgeMilliseconds / 1000,
return Enumerable.Repeat(new Measurement<long>(v.ProducerTransaction.IdempotentAgeMilliseconds / 1000,
new[]
{
new KeyValuePair<string, object?>(EnqueueTagName, v.ProducerTransaction.EnqAllowed),
new KeyValuePair<string, object?>(StateTagName, v.ProducerTransaction.TransactionState),
new KeyValuePair<string, object?>(TransactionTagName, this._config.TransactionalId)
new KeyValuePair<string, object?>(StateTagName, v.ProducerTransaction.IdempotentState),
}), 1);
}

return Empty;
}, "seconds", "Transactional producer state age seconds");
}
}, "seconds", "Idempotent state age seconds");

yield return meter;
}
#pragma warning restore CA2000 // Meter will be disposed in base class
protected override long GetTxRxMsg(Statistics value)
{
return value.TransmittedMessagesTotal;
Expand Down
32 changes: 19 additions & 13 deletions src/Epam.Kafka/Metrics/StatisticsMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ internal abstract class StatisticsMetrics : IObserver<Statistics>

private readonly object _syncObj = new();
private bool _initialized;
private Meter? _topLevelMeter;
private Meter? _topParMeter;
private Meter? _transactionMeter;
private readonly List<Meter> _meters = new();

protected Statistics? Value { get; private set; }
protected static IEnumerable<Measurement<long>> Empty { get; } = Enumerable.Empty<Measurement<long>>();
Expand All @@ -46,19 +44,18 @@ public void OnNext(Statistics value)
new KeyValuePair<string, object?>(TypeTag, value.Type),
};

this._topLevelMeter = new Meter(Statistics.TopLevelMeterName, null, topLevelTags);
this._topParMeter = new Meter(Statistics.TopicPartitionMeterName, null, topLevelTags);
this._transactionMeter = new Meter(Statistics.TransactionMeterName, null, topLevelTags);

this.Initialize(this._topLevelMeter, this._topParMeter, this._transactionMeter);

this._initialized = true;

foreach (Meter meter in this.Initialize(topLevelTags))
{
this._meters.Add(meter);
}
}
}
}
}

protected abstract void Initialize(Meter meter, Meter topParMeter, Meter transactionMeter);
protected abstract IEnumerable<Meter> Initialize(KeyValuePair<string, object?>[] topLevelTags);

public void OnError(Exception error)
{
Expand All @@ -67,9 +64,18 @@ public void OnError(Exception error)

public void OnCompleted()
{
this._topLevelMeter?.Dispose();
this._topParMeter?.Dispose();
this._transactionMeter?.Dispose();
foreach (Meter meter in this._meters)
{
#pragma warning disable CA1031 // don't prevent other meters dispose
try
{
meter.Dispose();
}
catch
{
}
#pragma warning restore CA1031
}
}

protected void CreateGauge(Meter meter, string name, Func<Statistics, long> factory, string? unit = null, string? description = null)
Expand Down
5 changes: 5 additions & 0 deletions src/Epam.Kafka/Statistics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public class Statistics
/// </summary>
public const string TransactionMeterName = "Epam.Kafka.Statistics.Transaction";

/// <summary>
/// Name of <see cref="Meter"/> used to expose consumer group statistics.
/// </summary>
public const string ConsumerGroupMeterName = "Epam.Kafka.Statistics.ConsumerGroup";

/// <summary>
/// Create new instance of <see cref="Statistics"/> object from json representation.
/// </summary>
Expand Down
13 changes: 13 additions & 0 deletions src/Epam.Kafka/Stats/GroupStatistics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ public class GroupStatistics
[JsonPropertyName("stateage")]
public long StateAgeMilliseconds { get; set; }

/// <summary>
/// Current assignment's partition count.
/// </summary>
[JsonPropertyName("assignment_size")]
public long AssignmentCount { get; set; }

/// <summary>
/// Local consumer group handler's join state.
/// </summary>
Expand All @@ -38,4 +44,11 @@ public class GroupStatistics
/// </summary>
[JsonPropertyName("rebalance_cnt")]
public long RebalanceCount { get; set; }

/// <summary>
/// Last rebalance reason, or empty string.
/// </summary>
[JsonPropertyName("rebalance_reason")]
public string RebalanceReason { get; set; } = string.Empty;

}
10 changes: 4 additions & 6 deletions src/Epam.Kafka/TestMockCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ public string BootstrapServers

Dictionary<Message<byte[], byte[]?>, DeliveryResult<byte[], byte[]?>> result = new(messages.Length);

using IAdminClient adminClient = producer.CreateDependentAdminClient();

adminClient.GetMetadata(topicName, TimeSpan.FromMilliseconds(DefaultTimeoutMs));

if (messages.Length > 0)
{
foreach (Message<byte[], byte[]?> m in messages)
Expand All @@ -125,12 +129,6 @@ public string BootstrapServers
throw new InvalidOperationException($"Produced {result.Count} of {messages.Length} messages.");
}
}
else
{
using IAdminClient adminClient = producer.CreateDependentAdminClient();

adminClient.GetMetadata(topicName, TimeSpan.FromMilliseconds(DefaultTimeoutMs));
}

return result;
}
Expand Down
18 changes: 7 additions & 11 deletions tests/Epam.Kafka.Tests/Epam.Kafka.Tests.csproj
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<None Remove="Data\ConsumerStat.json" />
</ItemGroup>

<ItemGroup>
<EmbeddedResource Include="Data\ConsumerStat.json" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="Data\ConsumerStat.json" />
</ItemGroup>


<ItemGroup>
Expand Down
Loading

0 comments on commit 0d26c19

Please sign in to comment.