Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test improvements #22

Merged
merged 4 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 28 additions & 40 deletions src/Epam.Kafka/Internals/KafkaFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@
using Epam.Kafka.Options;

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;

namespace Epam.Kafka.Internals;

internal sealed class KafkaFactory : IKafkaFactory, IDisposable
{
private const string DefaultLogHandler = ".DefaultLogHandler";
private const string Factory = ".Factory";
private const string LoggerCategoryName = "Epam.Kafka.Factory";

private readonly Dictionary<KafkaClusterOptions, AdminClient> _clients = new();
private readonly IOptionsMonitor<KafkaClusterOptions> _clusterOptions;
private readonly IOptionsMonitor<KafkaConsumerOptions> _consumerOptions;
private readonly ILoggerFactory? _loggerFactory;
private readonly ILoggerFactory _loggerFactory;
private readonly IOptionsMonitor<KafkaProducerOptions> _producerOptions;
private readonly Dictionary<KafkaClusterOptions, CachedSchemaRegistryClient> _registries = new();
private readonly object _syncObj = new();
Expand All @@ -36,7 +36,7 @@ public KafkaFactory(
this._clusterOptions = clusterOptions ?? throw new ArgumentNullException(nameof(clusterOptions));
this._consumerOptions = consumerOptions ?? throw new ArgumentNullException(nameof(consumerOptions));
this._producerOptions = producerOptions ?? throw new ArgumentNullException(nameof(producerOptions));
this._loggerFactory = loggerFactory;
this._loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
}

public void Dispose()
Expand Down Expand Up @@ -111,8 +111,7 @@ public IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(ConsumerConfig confi
config = new ConsumerConfig(resultConfig);

// Init logger category from config and remove key because it is not standard key and cause errors.
string logHandler = config.GetDotnetLoggerCategory() + DefaultLogHandler;
ILogger? fl = this._loggerFactory?.CreateLogger(config.GetDotnetLoggerCategory() + Factory);
string logHandler = config.GetDotnetLoggerCategory();
resultConfig.Remove(KafkaConfigExtensions.DotnetLoggerCategoryKey);

var builder = new ConsumerBuilder<TKey, TValue>(config);
Expand All @@ -130,30 +129,27 @@ public IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(ConsumerConfig confi
} // handler already set
}

if (this._loggerFactory != null)
try
{
ILogger logger = this._loggerFactory.CreateLogger(logHandler);

try
{
builder.SetLogHandler((_, m) => logger.KafkaLogHandler(m));
}
catch (InvalidOperationException)
{
} // handler already set
builder.SetLogHandler((_, m) => this._loggerFactory.CreateLogger(logHandler).KafkaLogHandler(m));
}
catch (InvalidOperationException)
{
} // handler already set

ILogger fl = this._loggerFactory.CreateLogger(LoggerCategoryName);

try
{
IConsumer<TKey, TValue> consumer = builder.Build();

fl?.ConsumerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));
fl.ConsumerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));

return consumer;
}
catch (Exception exc)
{
fl?.ConsumerCreateError(exc, PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));
fl.ConsumerCreateError(exc, PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));

throw;
}
Expand All @@ -168,12 +164,12 @@ public IProducer<TKey, TValue> CreateProducer<TKey, TValue>(ProducerConfig confi

Dictionary<string, string> resultConfig = MergeResultConfig(clusterOptions, config);

string logHandler = config.GetDotnetLoggerCategory() + DefaultLogHandler;
ILogger? fl = this._loggerFactory?.CreateLogger(config.GetDotnetLoggerCategory() + Factory);
resultConfig.Remove(KafkaConfigExtensions.DotnetLoggerCategoryKey);

config = new ProducerConfig(resultConfig);

// Init logger category from config and remove key because it is not standard key and cause errors.
string logHandler = config.GetDotnetLoggerCategory();
resultConfig.Remove(KafkaConfigExtensions.DotnetLoggerCategoryKey);

ProducerBuilder<TKey, TValue> builder = new(config);

configure?.Invoke(builder);
Expand All @@ -188,31 +184,27 @@ public IProducer<TKey, TValue> CreateProducer<TKey, TValue>(ProducerConfig confi
{
} // handler already set
}

if (this._loggerFactory != null)
try
{
ILogger logger = this._loggerFactory.CreateLogger(logHandler);

try
{
builder.SetLogHandler((_, m) => logger.KafkaLogHandler(m));
}
catch (InvalidOperationException)
{
} // handler already set
builder.SetLogHandler((_, m) => this._loggerFactory.CreateLogger(logHandler).KafkaLogHandler(m));
}
catch (InvalidOperationException)
{
} // handler already set

ILogger fl = this._loggerFactory.CreateLogger(LoggerCategoryName);

try
{
IProducer<TKey, TValue> producer = builder.Build();

fl?.ProducerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));
fl.ProducerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));

return producer;
}
catch (Exception exc)
{
fl?.ProducerCreateError(exc, PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));
fl.ProducerCreateError(exc, PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));

throw;
}
Expand Down Expand Up @@ -279,11 +271,7 @@ private static IEnumerable<KeyValuePair<string, string>> PrepareConfigForLogs(Co

static bool Contains(KeyValuePair<string, string> x, string value)
{
return x.Key.Contains(value
#if NET6_0_OR_GREATER
, StringComparison.Ordinal
#endif
);
return x.Key.IndexOf(value, StringComparison.OrdinalIgnoreCase) > -1;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/Epam.Kafka/KafkaConfigExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static class KafkaConfigExtensions
/// Default <see cref="IKafkaFactory"/> implementation use it only for logger configuration and don't pass it to producer or consumer builder to avoid errors.
/// </remarks>
public const string DotnetLoggerCategoryKey = "dotnet.logger.category";
private const string DotnetLoggerCategoryDefault = "Epam.Kafka";
private const string DotnetLoggerCategoryDefault = "Epam.Kafka.DefaultLogHandler";

/// <summary>
/// Read and return 'dotnet.logger.category' value if it exists, default value 'Epam.Kafka.DefaultLogHandler' otherwise.
Expand Down
2 changes: 1 addition & 1 deletion src/Epam.Kafka/LogExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static void KafkaLogHandler(this ILogger logger, LogMessage msg)
[LoggerMessage(
EventId = 300,
EventName = "KafkaLogHandler",
Message = "KafkaLogHandler {Facility} {ClientName} {Msg}.")]
Message = "{Facility} {ClientName} {Msg}.")]
static partial void KafkaLogHandler(this ILogger logger, LogLevel level, string clientName, string facility,
string msg);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ namespace Epam.Kafka.PubSub.IntegrationTests;
public static class IntegrationTestsExtensions
{
public static SubscriptionBuilder<string, TestEntityKafka, TestSubscriptionHandler> CreateDefaultSubscription(
this TestObserver observer, MockCluster mockCluster, AutoOffsetReset autoOffsetReset = AutoOffsetReset.Earliest)
this TestObserver observer, MockCluster mockCluster,
AutoOffsetReset autoOffsetReset = AutoOffsetReset.Earliest,
PartitionAssignmentStrategy assignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));
if (mockCluster == null) throw new ArgumentNullException(nameof(mockCluster));
Expand All @@ -26,6 +28,7 @@ public static SubscriptionBuilder<string, TestEntityKafka, TestSubscriptionHandl
x.ConsumerConfig.SessionTimeoutMs = 10_000;
x.ConsumerConfig.AutoOffsetReset = autoOffsetReset;
x.ConsumerConfig.SetCancellationDelayMaxMs(2000);
x.ConsumerConfig.PartitionAssignmentStrategy = assignmentStrategy;
});

return kafkaBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ public ReadTests(ITestOutputHelper output, MockCluster mockCluster) : base(outpu
this._mockCluster = mockCluster ?? throw new ArgumentNullException(nameof(mockCluster));
}

[Fact]
public async Task OneBatchTwoPartitions()
[Theory]
[InlineData(PartitionAssignmentStrategy.CooperativeSticky)]
[InlineData(PartitionAssignmentStrategy.Range)]
[InlineData(PartitionAssignmentStrategy.RoundRobin)]
public async Task OneBatchTwoPartitions(PartitionAssignmentStrategy assignmentStrategy)
{
TopicPartition tp1 = new(this.AnyTopicName, 1);
TopicPartition tp2 = new(this.AnyTopicName, 2);
Expand All @@ -35,7 +38,8 @@ public async Task OneBatchTwoPartitions()
this.Services.AddScoped(_ => handler);
this.Services.AddScoped(_ => offsets);

observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer)
observer.CreateDefaultSubscription(this._mockCluster,assignmentStrategy: assignmentStrategy)
.WithValueDeserializer(_ => deserializer)
.WithSubscribeAndExternalOffsets<TestOffsetsStorage>();

Dictionary<TestEntityKafka, TopicPartitionOffset> m1 = await MockCluster.SeedKafka(this, 5, tp1);
Expand Down
50 changes: 50 additions & 0 deletions tests/Epam.Kafka.Tests/Common/CollectionLoggerProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright © 2024 EPAM Systems

using Microsoft.Extensions.Logging;

namespace Epam.Kafka.Tests.Common;

public sealed class CollectionLoggerProvider : ILoggerProvider
{
public Dictionary<string, List<string>> Entries { get; } = new();

public void Dispose()
{
}

public ILogger CreateLogger(string categoryName)
{
if (!this.Entries.TryGetValue(categoryName, out List<string>? list))
{
list = new List<string>();
this.Entries[categoryName] = list;
}

return new Logger(list);
}

private class Logger : ILogger
{
private readonly List<string> _entries;

public Logger(List<string> entries)
{
this._entries = entries;
}
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func<TState, Exception?, string> formatter)
{
this._entries.Add(formatter(state, exception));
}

public bool IsEnabled(LogLevel logLevel)
{
return true;
}

public IDisposable? BeginScope<TState>(TState state) where TState : notnull
{
return null;
}
}

}
2 changes: 1 addition & 1 deletion tests/Epam.Kafka.Tests/KafkaConfigExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void LoggerCategory()
ConsumerConfig cfg = new ConsumerConfig();
string defValue = cfg.GetDotnetLoggerCategory();

Assert.Equal("Epam.Kafka", defValue);
Assert.Equal("Epam.Kafka.DefaultLogHandler", defValue);

cfg.SetDotnetLoggerCategory("qwe");
Assert.Equal("qwe", cfg.GetDotnetLoggerCategory());
Expand Down
43 changes: 38 additions & 5 deletions tests/Epam.Kafka.Tests/KafkaFactoryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
using Confluent.Kafka;

using Epam.Kafka.Tests.Common;

using Microsoft.Extensions.Logging;
using Shouldly;

using System.Linq;
using System.Xml.Linq;

using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -110,7 +107,8 @@ public void CreateProducerConfig(string? name, string expectedGroup)
[Fact]
public void CreateConfigsWithPlaceholders()
{
KafkaBuilder kafkaBuilder = MockCluster.AddMockCluster(this).WithDefaults(x =>
MockCluster.AddMockCluster(this)
.WithDefaults(x =>
{
x.Producer = "placeholder";
x.Consumer = "placeholder";
Expand Down Expand Up @@ -221,4 +219,39 @@ public void CreateDefaultClients()
IAdminClient dc = c1.CreateDependentAdminClient();
Assert.NotNull(dc);
}

[Fact]
public void ConfigSecretsInLogError()
{
CollectionLoggerProvider logger = new CollectionLoggerProvider();
this.LoggingBuilder.AddProvider(logger);

MockCluster.AddMockCluster(this);

var config = new ProducerConfig();
config.Set("Sasl.OAuthBearer.Client.Secret", "anyValue");

Assert.Throws<InvalidOperationException>(() => this.KafkaFactory.CreateProducer<string, string>(config));

logger.Entries.ShouldHaveSingleItem().Value.ShouldHaveSingleItem().ShouldContain("[Sasl.OAuthBearer.Client.Secret, *******]");
}

[Fact]
public void ConfigSecretsInLog()
{
CollectionLoggerProvider logger = new CollectionLoggerProvider();
this.LoggingBuilder.AddProvider(logger);

MockCluster.AddMockCluster(this);

var config = new ProducerConfig();
config.SetDotnetLoggerCategory("Qwe");
config.SaslOauthbearerClientSecret = "anyValue";
config.Debug = "all";

IProducer<string, string> producer = this.KafkaFactory.CreateProducer<string, string>(config);

logger.Entries["Epam.Kafka.Factory"].ShouldHaveSingleItem().ShouldContain("[sasl.oauthBearer.client.secret, *******]");
logger.Entries["Qwe"].ShouldNotBeEmpty();
}
}