Skip to content

Commit

Permalink
Mock broker #56 (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
IharYakimush authored Jan 16, 2025
1 parent da50776 commit 7ac066b
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/Epam.Kafka/KafkaBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
using Epam.Kafka.Options.Validation;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Epam.Kafka;
Expand Down Expand Up @@ -64,6 +66,29 @@ public OptionsBuilder<KafkaClusterOptions> WithClusterConfig(string name)
return this.Services.AddOptions<KafkaClusterOptions>(name);
}

/// <summary>
/// EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL
/// Add <see cref="TestMockCluster"/> with logical name specified by <paramref name="name" />.
/// </summary>
/// <param name="name">The logical name.</param>
/// <param name="numBrokers">The number of brokers (default 1)</param>
/// <returns>Options builder for further configuration.</returns>
/// <exception cref="ArgumentNullException"></exception>
public OptionsBuilder<KafkaClusterOptions> WithTestMockCluster(string name, int numBrokers = 1)
{
if (name == null)
{
throw new ArgumentNullException(nameof(name));
}

this.Services.TryAddKeyedSingleton(name, (sp, _) => new TestMockCluster(numBrokers, sp.GetService<ILoggerFactory>()));

return this.Services.AddOptions<KafkaClusterOptions>(name).Configure<IServiceProvider>((options, sp) =>
{
options.ClientConfig.BootstrapServers = sp.GetRequiredKeyedService<TestMockCluster>(name).BootstrapServers;
});
}

/// <summary>
/// Add <see cref="KafkaConsumerOptions" /> with logical name specified by <paramref name="name" />.
/// </summary>
Expand Down
164 changes: 164 additions & 0 deletions src/Epam.Kafka/TestMockCluster.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright © 2024 EPAM Systems

using Confluent.Kafka;

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace Epam.Kafka;

/// <summary>
/// EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL
/// Provides mock Kafka cluster with a configurable number of brokers
/// that support a reasonable subset of Kafka protocol operations,
/// error injection, etc. Mock clusters provide localhost listeners that can be used as the bootstrap
/// servers.
/// </summary>
/// <remarks>
/// Currently supported functionality:
/// <list type="string">Producer</list>
/// <list type="string">Idempotent Producer</list>
/// <list type="string">Transactional Producer</list>
/// <list type="string">Low-level consumer</list>
/// <list type="string">High-level balanced consumer groups with offset commits</list>
/// <list type="string">Topic Metadata and auto creation</list>
/// <list type="string">Telemetry (KIP-714)</list>
/// </remarks>
public sealed class TestMockCluster : IDisposable
{
private readonly Lazy<IProducer<byte[], byte[]?>> _producerLazy;

private const int DefaultTimeoutMs = 5000;

private bool _disposed;

/// <summary>
/// Initialize the <see cref="TestMockCluster" /> instance.
/// </summary>
/// <param name="numBrokers">The number of brokers (default 1)</param>
/// <param name="loggerFactory">Optional logger factory</param>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="numBrokers"/> should be in range [1,3].</exception>
public TestMockCluster(int numBrokers = 1, ILoggerFactory? loggerFactory = null)
{
if (numBrokers <= 0) throw new ArgumentOutOfRangeException(nameof(numBrokers));
if (numBrokers > 3) throw new ArgumentOutOfRangeException(nameof(numBrokers));

ILogger logger = loggerFactory?.CreateLogger(typeof(TestMockCluster)) ?? NullLogger.Instance;

this._producerLazy =
new(() =>
{
var config = new ProducerConfig
{
AllowAutoCreateTopics = true,
BootstrapServers = "localhost:9200",
MessageTimeoutMs = DefaultTimeoutMs
};

config.Set("test.mock.num.brokers", $"{numBrokers}");

ProducerBuilder<byte[], byte[]?> pb = new(config);

pb.SetLogHandler((_, message) => logger.KafkaLogHandler(message));

return pb.Build();
}, LazyThreadSafetyMode.ExecutionAndPublication);
}

/// <summary>
/// Comma separated addresses of localhost listeners that can be used as the bootstrap
/// servers
/// </summary>
public string BootstrapServers
{
get
{
if (this._disposed)
{
throw new ObjectDisposedException(nameof(TestMockCluster));
}

using IAdminClient adminClient = this._producerLazy.Value.CreateDependentAdminClient();

Metadata metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(1));

return string.Join(", ", metadata.Brokers.Select(x => $"{x.Host}:{x.Port}"));
}
}

/// <summary>
/// Produce messages to desired topic. If topic not exists it will be created even if no messages provided.
/// </summary>
/// <param name="topicName">The topic name.</param>
/// <param name="messages">Messages to produce</param>
/// <returns>The <see cref="DeliveryResult{TKey,TValue}"/>.</returns>
public Dictionary<Message<byte[], byte[]?>, DeliveryResult<byte[], byte[]?>> SeedTopic(string topicName, params Message<byte[], byte[]?>[] messages)
{
if (topicName == null) throw new ArgumentNullException(nameof(topicName));
if (messages == null) throw new ArgumentNullException(nameof(messages));

if (this._disposed)
{
throw new ObjectDisposedException(nameof(TestMockCluster));
}

IProducer<byte[], byte[]?> producer = this._producerLazy.Value;

Dictionary<Message<byte[], byte[]?>, DeliveryResult<byte[], byte[]?>> result = new(messages.Length);

if (messages.Length > 0)
{
foreach (Message<byte[], byte[]?> m in messages)
{
producer.Produce(topicName, m, r => result[m] = r);
}

DateTime wait = DateTime.UtcNow.AddMilliseconds(DefaultTimeoutMs);

while (DateTime.UtcNow < wait && result.Count < messages.Length)
{
producer.Poll(TimeSpan.FromMilliseconds(DefaultTimeoutMs / 10));
}

if (result.Count != messages.Length)
{
throw new InvalidOperationException($"Produced {result.Count} of {messages.Length} messages.");
}
}
else
{
using IAdminClient adminClient = producer.CreateDependentAdminClient();

adminClient.GetMetadata(topicName, TimeSpan.FromMilliseconds(DefaultTimeoutMs));
}

return result;
}

/// <summary>
/// Creates depended admin client
/// </summary>
/// <returns></returns>
public IAdminClient CreateDependentAdminClient()
{
if (this._disposed)
{
throw new ObjectDisposedException(nameof(TestMockCluster));
}

return this._producerLazy.Value.CreateDependentAdminClient();
}

/// <summary>
/// Dispose mock cluster listener
/// </summary>
public void Dispose()
{
if (!this._disposed && this._producerLazy.IsValueCreated)
{
this._producerLazy.Value.Dispose();
}

this._disposed = true;
}
}
9 changes: 9 additions & 0 deletions tests/Epam.Kafka.Tests/Epam.Kafka.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace Epam.Kafka
public Microsoft.Extensions.Options.OptionsBuilder<Epam.Kafka.Options.KafkaConsumerOptions> WithConsumerConfig(string name) { }
public Epam.Kafka.KafkaBuilder WithDefaults(System.Action<Epam.Kafka.Options.KafkaFactoryOptions> configure) { }
public Microsoft.Extensions.Options.OptionsBuilder<Epam.Kafka.Options.KafkaProducerOptions> WithProducerConfig(string name) { }
public Microsoft.Extensions.Options.OptionsBuilder<Epam.Kafka.Options.KafkaClusterOptions> WithTestMockCluster(string name, int numBrokers = 1) { }
}
public static class KafkaClientExtensions
{
Expand Down Expand Up @@ -57,6 +58,14 @@ namespace Epam.Kafka
{
public static Epam.Kafka.KafkaBuilder AddKafka(this Microsoft.Extensions.DependencyInjection.IServiceCollection services, bool useConfiguration = true) { }
}
public sealed class TestMockCluster : System.IDisposable
{
public TestMockCluster(int numBrokers = 1, Microsoft.Extensions.Logging.ILoggerFactory? loggerFactory = null) { }
public string BootstrapServers { get; }
public Confluent.Kafka.IAdminClient CreateDependentAdminClient() { }
public void Dispose() { }
public System.Collections.Generic.Dictionary<Confluent.Kafka.Message<byte[], byte[]?>, Confluent.Kafka.DeliveryResult<byte[], byte[]?>> SeedTopic(string topicName, params Confluent.Kafka.Message<byte[], byte[]?>[] messages) { }
}
}
namespace Epam.Kafka.Options
{
Expand Down
24 changes: 24 additions & 0 deletions tests/Epam.Kafka.Tests/KafkaFactoryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using Epam.Kafka.Tests.Common;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

using Moq;
Expand All @@ -21,6 +22,29 @@ public KafkaFactoryTests(ITestOutputHelper output) : base(output)
{
}

[Fact]
public void TestMockCluster()
{
this.Services.AddKafka().WithTestMockCluster("qwe");

TestMockCluster mockCluster = this.ServiceProvider.GetRequiredKeyedService<TestMockCluster>("qwe");

this.Output.WriteLine(mockCluster.BootstrapServers);

// create empty topic
mockCluster.SeedTopic("anyName1");

// create topic and produce message
mockCluster.SeedTopic("anyName2", new Message<byte[], byte[]?> { Key = Guid.NewGuid().ToByteArray() });

using IAdminClient adminClient = mockCluster.CreateDependentAdminClient();

foreach (var t in adminClient.GetMetadata(TimeSpan.FromSeconds(1)).Topics)
{
this.Output.WriteLine(t.Topic);
}
}

[Theory]
[InlineData(null, MockCluster.DefaultConsumerGroup)]
[InlineData(MockCluster.DefaultConsumer, MockCluster.DefaultConsumerGroup)]
Expand Down

0 comments on commit 7ac066b

Please sign in to comment.