Skip to content

Commit

Permalink
producer transaction metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
IharYakimush committed Jan 31, 2025
1 parent 6326eb4 commit 3089362
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 47 deletions.
2 changes: 1 addition & 1 deletion src/Epam.Kafka/Internals/KafkaFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public IProducer<TKey, TValue> CreateProducer<TKey, TValue>(ProducerConfig confi

try
{
producer = new(builder);
producer = new(builder, config);

logger.ProducerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Epam.Kafka/Internals/Observable/ObservableProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ internal class ObservableProducer<TKey, TValue> : ObservableClient, IProducer<TK
{
private readonly IProducer<TKey, TValue> _inner;

public ObservableProducer(ProducerBuilder<TKey, TValue> builder)
public ObservableProducer(ProducerBuilder<TKey, TValue> builder, ProducerConfig config)
{
if (builder == null) throw new ArgumentNullException(nameof(builder));

Expand All @@ -30,7 +30,7 @@ public ObservableProducer(ProducerBuilder<TKey, TValue> builder)
this.StatObservers = new List<IObserver<string>>();

#pragma warning disable CA2000 // unsubscribe not needed
this.Subscribe(new ProducerMetrics());
this.Subscribe(new ProducerMetrics(config));
#pragma warning restore CA2000
}
catch (InvalidOperationException)
Expand Down
2 changes: 1 addition & 1 deletion src/Epam.Kafka/Metrics/CommonMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Epam.Kafka.Metrics;

internal abstract class CommonMetrics : StatisticsMetrics
{
protected override void Initialize(Meter meter, Meter topParMeter)
protected override void Initialize(Meter meter, Meter topParMeter, Meter transactionMeter)
{
this.CreateCounter(meter, "epam_kafka_stats_trx_msgs", this.GetTxRxMsg,
description: "Number of messages consumed or produced.");
Expand Down
52 changes: 20 additions & 32 deletions src/Epam.Kafka/Metrics/ConsumerMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,11 @@ public ConsumerMetrics(ConsumerConfig config)
this._config = config ?? throw new ArgumentNullException(nameof(config));
}

protected override void Initialize(Meter meter, Meter topParMeter)
protected override void Initialize(Meter meter, Meter topParMeter, Meter transactionMeter)
{
base.Initialize(meter, topParMeter);
base.Initialize(meter, topParMeter, transactionMeter);

this.CreateTpGauge(topParMeter, "epam_kafka_stats_tp_lag",
m => m.Value.ConsumerLag, null, "Consumer lag");
}

protected override long GetTxRxMsg(Statistics value)
{
return value.ConsumedMessagesTotal;
}

protected override long GetTxRx(Statistics value)
{
return value.ConsumedRequestsTotal;
}

protected override long GetTxRxBytes(Statistics value)
{
return value.ConsumedBytesTotal;
}

private void CreateTpGauge(Meter meter, string name,
Func<KeyValuePair<TopicStatistics, PartitionStatistics>, long> factory,
string? unit = null,
string? description = null)
{
if (meter == null) throw new ArgumentNullException(nameof(meter));
if (name == null) throw new ArgumentNullException(nameof(name));

meter.CreateObservableGauge(name, () =>
topParMeter.CreateObservableGauge("epam_kafka_stats_tp_lag", () =>
{
Statistics? v = this.Value;

Expand All @@ -64,7 +37,7 @@ private void CreateTpGauge(Meter meter, string name,
.SelectMany(p =>
p.Value.Partitions.Where(x => x.Key != PartitionStatistics.InternalUnassignedPartition)
.Select(x => new KeyValuePair<TopicStatistics, PartitionStatistics>(p.Value, x.Value)))
.Select(m => new Measurement<long>(factory(m), new[]
.Select(m => new Measurement<long>(m.Value.ConsumerLag, new[]
{
new KeyValuePair<string, object?>(DesiredTagName, m.Value.Desired),
new KeyValuePair<string, object?>(FetchTagName, m.Value.FetchState),
Expand All @@ -75,6 +48,21 @@ private void CreateTpGauge(Meter meter, string name,
}

return Empty;
}, unit, description);
}, null, "Consumer lag");
}

protected override long GetTxRxMsg(Statistics value)
{
return value.ConsumedMessagesTotal;
}

protected override long GetTxRx(Statistics value)
{
return value.ConsumedRequestsTotal;
}

protected override long GetTxRxBytes(Statistics value)
{
return value.ConsumedBytesTotal;
}
}
37 changes: 37 additions & 0 deletions src/Epam.Kafka/Metrics/ProducerMetrics.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,46 @@
// Copyright © 2024 EPAM Systems

using System.Diagnostics.Metrics;

using Confluent.Kafka;

namespace Epam.Kafka.Metrics;

internal sealed class ProducerMetrics : CommonMetrics
{
private const string EnqueueTagName = "Enqueue";
private const string TransactionTagName = "Transaction";
private const string StateTagName = "State";

private readonly ProducerConfig _config;

public ProducerMetrics(ProducerConfig config)
{
this._config = config ?? throw new ArgumentNullException(nameof(config));
}

protected override void Initialize(Meter meter, Meter topParMeter, Meter transactionMeter)
{
base.Initialize(meter, topParMeter, transactionMeter);

transactionMeter.CreateObservableGauge("epam_kafka_stats_eos_txn_age", () =>
{
Statistics? v = this.Value;

if (v != null && !string.IsNullOrWhiteSpace(this._config.TransactionalId))
{
return Enumerable.Repeat(new Measurement<long>(v.ProducerTransaction.TransactionAgeMilliseconds / 1000,
new[]
{
new KeyValuePair<string, object?>(EnqueueTagName, v.ProducerTransaction.EnqAllowed),
new KeyValuePair<string, object?>(StateTagName, v.ProducerTransaction.TransactionState),
new KeyValuePair<string, object?>(TransactionTagName, this._config.TransactionalId)
}), 1);
}

return Empty;
}, "seconds", "Transactional producer state age seconds");
}

protected override long GetTxRxMsg(Statistics value)
{
Expand Down
7 changes: 5 additions & 2 deletions src/Epam.Kafka/Metrics/StatisticsMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ internal abstract class StatisticsMetrics : IObserver<Statistics>
private bool _initialized;
private Meter? _topLevelMeter;
private Meter? _topParMeter;
private Meter? _transactionMeter;

protected Statistics? Value { get; private set; }
protected static IEnumerable<Measurement<long>> Empty { get; } = Enumerable.Empty<Measurement<long>>();
Expand Down Expand Up @@ -47,16 +48,17 @@ public void OnNext(Statistics value)

this._topLevelMeter = new Meter(Statistics.TopLevelMeterName, null, topLevelTags);
this._topParMeter = new Meter(Statistics.TopicPartitionMeterName, null, topLevelTags);
this._transactionMeter = new Meter(Statistics.TransactionMeterName, null, topLevelTags);

this.Initialize(this._topLevelMeter, this._topParMeter);
this.Initialize(this._topLevelMeter, this._topParMeter, this._transactionMeter);

this._initialized = true;
}
}
}
}

protected abstract void Initialize(Meter meter, Meter topParMeter);
protected abstract void Initialize(Meter meter, Meter topParMeter, Meter transactionMeter);

public void OnError(Exception error)
{
Expand All @@ -67,6 +69,7 @@ public void OnCompleted()
{
this._topLevelMeter?.Dispose();
this._topParMeter?.Dispose();
this._transactionMeter?.Dispose();
}

protected void CreateGauge(Meter meter, string name, Func<Statistics, long> factory, string? unit = null, string? description = null)
Expand Down
9 changes: 7 additions & 2 deletions src/Epam.Kafka/Statistics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ public class Statistics
public const string TopLevelMeterName = "Epam.Kafka.Statistics";

/// <summary>
/// Name of <see cref="Meter"/> used to expose top topic partition statistics.
/// Name of <see cref="Meter"/> used to expose topic partition statistics.
/// </summary>
public const string TopicPartitionMeterName = "Epam.Kafka.Statistics.TopicPartition";

/// <summary>
/// Name of <see cref="Meter"/> used to expose producer transaction statistics.
/// </summary>
public const string TransactionMeterName = "Epam.Kafka.Statistics.Transaction";

/// <summary>
/// Create new instance of <see cref="Statistics"/> object from json representation.
/// </summary>
Expand Down Expand Up @@ -189,5 +194,5 @@ public static Statistics FromJson(string json)
/// EOS / Idempotent producer state and metrics.
/// </summary>
[JsonPropertyName("eos")]
public GroupStatistics ProducerTransaction { get; } = new();
public TransactionStatistics ProducerTransaction { get; } = new();
}
3 changes: 2 additions & 1 deletion tests/Epam.Kafka.Tests/Epam.Kafka.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ namespace Epam.Kafka
{
public const string TopLevelMeterName = "Epam.Kafka.Statistics";
public const string TopicPartitionMeterName = "Epam.Kafka.Statistics.TopicPartition";
public const string TransactionMeterName = "Epam.Kafka.Statistics.Transaction";
public Statistics() { }
[System.Text.Json.Serialization.JsonPropertyName("age")]
public long AgeMicroseconds { get; set; }
Expand Down Expand Up @@ -92,7 +93,7 @@ namespace Epam.Kafka
[System.Text.Json.Serialization.JsonPropertyName("msg_size_max")]
public long ProducerQueueSizeMax { get; set; }
[System.Text.Json.Serialization.JsonPropertyName("eos")]
public Epam.Kafka.Stats.GroupStatistics ProducerTransaction { get; }
public Epam.Kafka.Stats.TransactionStatistics ProducerTransaction { get; }
[System.Text.Json.Serialization.JsonIgnore]
public string RawJson { get; }
[System.Text.Json.Serialization.JsonPropertyName("time")]
Expand Down
62 changes: 57 additions & 5 deletions tests/Epam.Kafka.Tests/MetricsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public MetricsTests(ITestOutputHelper output) : base(output)
}

[Fact]
public void CreateDefaultClientWithMetrics()
public async Task CreateDefaultClientWithMetrics()
{
MockCluster.AddMockCluster(this).WithClusterConfig(MockCluster.ClusterName)
.Configure(x => x.ClientConfig.StatisticsIntervalMs = 100);
Expand All @@ -31,20 +31,20 @@ public void CreateDefaultClientWithMetrics()

using IClient c1 = this.KafkaFactory.GetOrCreateClient();
Assert.NotNull(c1);
Task.Delay(200).Wait();
await Task.Delay(200);
ml.RecordObservableInstruments(this.Output);

ml.Results.Count.ShouldBe(4);

Task.Delay(1000).Wait();
await Task.Delay(1000);

ml.RecordObservableInstruments(this.Output);

ml.Results.Count.ShouldBe(4);
}

[Fact]
public void ConsumerTopParMetricsAssign()
public async Task ConsumerTopParMetricsAssign()
{
this.Services.AddKafka(false).WithTestMockCluster(MockCluster.ClusterName);

Expand All @@ -62,7 +62,7 @@ public void ConsumerTopParMetricsAssign()
}, MockCluster.ClusterName);

// No assigned topic partitions
Task.Delay(200).Wait();
await Task.Delay(200);
ml.RecordObservableInstruments();
ml.Results.Count.ShouldBe(0);

Expand All @@ -80,4 +80,56 @@ public void ConsumerTopParMetricsAssign()
ml.RecordObservableInstruments();
ml.Results.Count.ShouldBe(0);
}

[Fact]
public async Task ProducerTransaction()
{
this.Services.AddKafka(false).WithTestMockCluster(MockCluster.ClusterName);

using MeterHelper ml = new(Statistics.TransactionMeterName);
ml.RecordObservableInstruments();
ml.Results.Count.ShouldBe(0);

using IProducer<int, int> producer =
this.KafkaFactory.CreateProducer<int, int>(new ProducerConfig
{
TransactionalId = "qwe",
StatisticsIntervalMs = 100
}, MockCluster.ClusterName);

await Task.Delay(200);
ml.RecordObservableInstruments(this.Output);
ml.Results.Count.ShouldBe(1);
ml.Results.Keys.Single().ShouldContain("Type:producer-Enqueue:False-State:Init-Transaction:qwe");

// One 1 of 4 assigned
producer.InitTransactions(TimeSpan.FromSeconds(3));

await Task.Delay(200);
ml.RecordObservableInstruments(this.Output);
ml.Results.Count.ShouldBe(1);
ml.Results.Keys.Single().ShouldContain("Type:producer-Enqueue:False-State:Ready-Transaction:qwe");

producer.BeginTransaction();

await Task.Delay(200);
ml.RecordObservableInstruments(this.Output);
ml.Results.Count.ShouldBe(1);
ml.Results.Keys.Single().ShouldContain("Type:producer-Enqueue:True-State:InTransaction-Transaction:qwe");

await producer.ProduceAsync("test", new Message<int, int> { Key = 1, Value = 2 });
ml.RecordObservableInstruments(this.Output);

await Task.Delay(200);
ml.RecordObservableInstruments(this.Output);
ml.Results.Count.ShouldBe(1);
//ml.Results.Keys.Single().ShouldContain("Type:producer-Enqueue:False-State:Init-Transaction:qwe");

producer.CommitTransaction();

await Task.Delay(200);
ml.RecordObservableInstruments(this.Output);
ml.Results.Count.ShouldBe(1);
//ml.Results.Keys.Single().ShouldContain("Type:producer-Enqueue:False-State:Init-Transaction:qwe");
}
}
24 changes: 23 additions & 1 deletion tests/Epam.Kafka.Tests/StatisticsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void TopLevelMetricsTests()
using MeterHelper ml = new(Statistics.TopLevelMeterName);

ConsumerMetrics cm = new(new ConsumerConfig());
ProducerMetrics pm = new();
ProducerMetrics pm = new(new ProducerConfig());

cm.OnNext(new Statistics { ClientId = "c1", Name = "n1", Type = "c", ConsumedMessagesTotal = 123, OpsQueueCountGauge = 332 });
pm.OnNext(new Statistics { ClientId = "c1", Name = "n2", Type = "p", TransmittedMessagesTotal = 111 });
Expand Down Expand Up @@ -162,4 +162,26 @@ public void TopParMetricsTests()

cm.OnCompleted();
}

[Fact]
public void TransactionMetricsTests()
{
using MeterHelper ml = new(Statistics.TransactionMeterName);

ProducerMetrics cm = new(new ProducerConfig { TransactionalId = "qwe" });

Statistics statistics = new Statistics { ClientId = "c1", Name = "n1", Type = "c", ConsumedMessagesTotal = 123 };
statistics.ProducerTransaction.EnqAllowed = true;
statistics.ProducerTransaction.TransactionState = "test";
statistics.ProducerTransaction.TransactionAgeMilliseconds = 120000;

cm.OnNext(statistics);

ml.RecordObservableInstruments(this.Output);

ml.Results.Count.ShouldBe(1);
ml.Results["epam_kafka_stats_eos_txn_age_Handler:n1-Name:c1-Type:c-Enqueue:True-State:test-Transaction:qwe"].ShouldBe(120);

cm.OnCompleted();
}
}

0 comments on commit 3089362

Please sign in to comment.