Skip to content

Commit

Permalink
revert statistics and metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
IharYakimush committed Jul 11, 2024
1 parent f685a26 commit c95bd28
Show file tree
Hide file tree
Showing 22 changed files with 26 additions and 771 deletions.
8 changes: 2 additions & 6 deletions sample/Epam.Kafka.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
using Epam.Kafka.Sample.Data;
using Epam.Kafka.Sample.Json;
using Epam.Kafka.Sample.Samples;
using Epam.Kafka.Stats;

using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
Expand All @@ -34,7 +34,7 @@ private static void Main(string[] args)
// view metrics in console for demo purposes only
services.AddOpenTelemetry()
.WithMetrics(mb => mb
.AddMeter(PipelineMonitor.StatusMeterName, PipelineMonitor.HealthMeterName, Statistics.MeterName)
.AddMeter(PipelineMonitor.StatusMeterName, PipelineMonitor.HealthMeterName)
.AddConsoleExporter());

KafkaBuilder kafkaBuilder = services.AddKafka()
Expand All @@ -48,10 +48,6 @@ private static void Main(string[] args)
// Also it is possible to run real kafka cluster in docker using provided 'docker-compose.yml' file.
options.ClientConfig.BootstrapServers = RunMockServer();

// Emit librdkafka statistics and expose it via metrics
options.ClientConfig.SetDotnetStatisticMetrics(true);
options.ClientConfig.StatisticsIntervalMs = 10_000;

}).WithHealthCheck().Configure(options =>
{
options.SkipAdminClient = true;
Expand Down
1 change: 1 addition & 0 deletions src/Epam.Kafka.PubSub/Epam.Kafka.PubSub.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="6.0.0" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="6.0.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net8.0'">
Expand Down
2 changes: 0 additions & 2 deletions src/Epam.Kafka/Epam.Kafka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ Simplify Confluent.Kafka usage by leveraging best patterns and practices:
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.4.0" />
<PackageReference Include="Confluent.SchemaRegistry" Version="2.4.0" />
<PackageReference Include="System.Text.Json" Version="8.0.4" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net6.0' or '$(TargetFramework)' == 'net462' or '$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="6.0.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net8.0'">
Expand Down
14 changes: 0 additions & 14 deletions src/Epam.Kafka/Internals/JsonContext.cs

This file was deleted.

8 changes: 2 additions & 6 deletions src/Epam.Kafka/Internals/KafkaFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ public IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(ConsumerConfig confi

// Init logger category from config and remove key because it is not standard key and cause errors.
string logHandler = config.GetDotnetLoggerCategory();
bool metrics = config.GetDotnetStatisticMetrics();
resultConfig.Remove(KafkaConfigExtensions.DotnetLoggerCategoryKey);
resultConfig.Remove(KafkaConfigExtensions.DotnetStatisticMetricsKey);

var builder = new ConsumerBuilder<TKey, TValue>(config);

Expand Down Expand Up @@ -146,7 +144,7 @@ public IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(ConsumerConfig confi

try
{
IConsumer<TKey, TValue> consumer = new ObservableConsumer<TKey, TValue>(builder, metrics);
IConsumer<TKey, TValue> consumer = new ObservableConsumer<TKey, TValue>(builder);

fl.ConsumerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));

Expand All @@ -173,9 +171,7 @@ public IProducer<TKey, TValue> CreateProducer<TKey, TValue>(ProducerConfig confi

// Init logger category from config and remove key because it is not standard key and cause errors.
string logHandler = config.GetDotnetLoggerCategory();
bool metrics = config.GetDotnetStatisticMetrics();
resultConfig.Remove(KafkaConfigExtensions.DotnetLoggerCategoryKey);
resultConfig.Remove(KafkaConfigExtensions.DotnetStatisticMetricsKey);

ProducerBuilder<TKey, TValue> builder = new(config);

Expand Down Expand Up @@ -204,7 +200,7 @@ public IProducer<TKey, TValue> CreateProducer<TKey, TValue>(ProducerConfig confi

try
{
ObservableProducer<TKey, TValue> producer = new(builder, metrics);
ObservableProducer<TKey, TValue> producer = new(builder);

fl.ProducerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));

Expand Down
30 changes: 0 additions & 30 deletions src/Epam.Kafka/Internals/Metrics/ConsumerMetrics.cs

This file was deleted.

19 changes: 0 additions & 19 deletions src/Epam.Kafka/Internals/Metrics/ProducerMetrics.cs

This file was deleted.

53 changes: 0 additions & 53 deletions src/Epam.Kafka/Internals/Metrics/StatisticsMetrics.cs

This file was deleted.

36 changes: 6 additions & 30 deletions src/Epam.Kafka/Internals/Observable/ObservableClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,18 @@

using Confluent.Kafka;

using Epam.Kafka.Stats;

namespace Epam.Kafka.Internals.Observable;

internal abstract class ObservableClient : ClientWrapper, IObservable<Error>, IObservable<Statistics>
internal abstract class ObservableClient : ClientWrapper, IObservable<Error>, IObservable<string>
{
protected List<IObserver<Error>>? ErrorObservers { get; set; }
protected List<IObserver<Statistics>>? StatObservers { get; set; }
protected List<IObserver<string>>? StatObservers { get; set; }

protected void StatisticsHandler(string json)
{
// don't try to parse if no subscribers
if (this.StatObservers!.Count <= 0)
{
return;
}

Statistics value;

try
{
value = Statistics.FromJson(json);
}
catch (Exception e) when (e is ArgumentNullException or ArgumentException)
{
foreach (IObserver<Statistics> observer in this.StatObservers)
{
observer.OnError(e);
}

return;
}

foreach (IObserver<Statistics> observer in this.StatObservers)
foreach (IObserver<string> observer in this.StatObservers!)
{
observer.OnNext(value);
observer.OnNext(json);
}
}

Expand Down Expand Up @@ -89,7 +65,7 @@ public IDisposable Subscribe(IObserver<Error> observer)
return new Unsubscriber<Error>(this.ErrorObservers, observer);
}

public IDisposable Subscribe(IObserver<Statistics> observer)
public IDisposable Subscribe(IObserver<string> observer)
{
if (this.StatObservers == null)
{
Expand All @@ -102,7 +78,7 @@ public IDisposable Subscribe(IObserver<Statistics> observer)
this.StatObservers.Add(observer);
}

return new Unsubscriber<Statistics>(this.StatObservers, observer);
return new Unsubscriber<string>(this.StatObservers, observer);
}

private class Unsubscriber<T> : IDisposable
Expand Down
17 changes: 2 additions & 15 deletions src/Epam.Kafka/Internals/Observable/ObservableConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,13 @@

using Confluent.Kafka;

using Epam.Kafka.Internals.Metrics;
using Epam.Kafka.Stats;

using System.Diagnostics.Metrics;

namespace Epam.Kafka.Internals.Observable;

internal class ObservableConsumer<TKey, TValue> : ObservableClient, IConsumer<TKey, TValue>
{
private readonly IConsumer<TKey, TValue> _inner;
private readonly Meter? _meter;

public ObservableConsumer(ConsumerBuilder<TKey, TValue> builder, bool metrics)
public ObservableConsumer(ConsumerBuilder<TKey, TValue> builder)
{
if (builder == null) throw new ArgumentNullException(nameof(builder));

Expand All @@ -31,12 +25,7 @@ public ObservableConsumer(ConsumerBuilder<TKey, TValue> builder, bool metrics)
try
{
builder.SetStatisticsHandler((_, json) => this.StatisticsHandler(json));
this.StatObservers = new List<IObserver<Statistics>>();
if (metrics)
{
this._meter = new Meter(Statistics.MeterName);
this.StatObservers.Add(new ConsumerMetrics(this._meter));
}
this.StatObservers = new List<IObserver<string>>();
}
catch (InvalidOperationException)
{
Expand All @@ -57,8 +46,6 @@ public override void Dispose()
finally
{
this.ClearObservers();

this._meter?.Dispose();
}
}

Expand Down
17 changes: 2 additions & 15 deletions src/Epam.Kafka/Internals/Observable/ObservableProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,13 @@

using Confluent.Kafka;

using Epam.Kafka.Internals.Metrics;
using Epam.Kafka.Stats;

using System.Diagnostics.Metrics;

namespace Epam.Kafka.Internals.Observable;

internal class ObservableProducer<TKey, TValue> : ObservableClient, IProducer<TKey, TValue>
{
private readonly IProducer<TKey, TValue> _inner;
private readonly Meter? _meter;

public ObservableProducer(ProducerBuilder<TKey, TValue> builder, bool metrics)
public ObservableProducer(ProducerBuilder<TKey, TValue> builder)
{
if (builder == null) throw new ArgumentNullException(nameof(builder));

Expand All @@ -31,12 +25,7 @@ public ObservableProducer(ProducerBuilder<TKey, TValue> builder, bool metrics)
try
{
builder.SetStatisticsHandler((_, json) => this.StatisticsHandler(json));
this.StatObservers = new List<IObserver<Statistics>>();
if (metrics)
{
this._meter = new Meter(Statistics.MeterName);
this.StatObservers.Add(new ProducerMetrics(this._meter));
}
this.StatObservers = new List<IObserver<string>>();
}
catch (InvalidOperationException)
{
Expand All @@ -57,8 +46,6 @@ public override void Dispose()
finally
{
this.ClearObservers();

this._meter?.Dispose();
}
}

Expand Down
8 changes: 3 additions & 5 deletions src/Epam.Kafka/Internals/SharedClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@

using Confluent.Kafka;

using Epam.Kafka.Stats;

namespace Epam.Kafka.Internals;

internal sealed class SharedClient : ClientWrapper, IObservable<Error>, IObservable<Statistics>
internal sealed class SharedClient : ClientWrapper, IObservable<Error>, IObservable<string>
{
public const string ProducerName = "Shared";

Expand Down Expand Up @@ -42,8 +40,8 @@ public IDisposable Subscribe(IObserver<Error> observer)
return ((IObservable<Error>)this._client).Subscribe(observer);
}

public IDisposable Subscribe(IObserver<Statistics> observer)
public IDisposable Subscribe(IObserver<string> observer)
{
return ((IObservable<Statistics>)this._client).Subscribe(observer);
return ((IObservable<string>)this._client).Subscribe(observer);
}
}
Loading

0 comments on commit c95bd28

Please sign in to comment.