Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally throw exception if OAuth handler already set and callback … #45

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 54 additions & 28 deletions src/Epam.Kafka/Internals/KafkaFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,41 +121,54 @@ public IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(ConsumerConfig confi

configure?.Invoke(builder);

bool oauthSet = false;
bool logSet = false;

try
{
builder.SetLogHandler((_, m) => this._loggerFactory.CreateLogger(logHandler).KafkaLogHandler(m));
logSet = true;
}
catch (InvalidOperationException)
{
// handler already set
}

if (clusterOptions is { OauthHandler: { }, ClientConfig.SaslMechanism: SaslMechanism.OAuthBearer })
{
try
{
builder.SetOAuthBearerTokenRefreshHandler(clusterOptions.OauthHandler.Invoke);
oauthSet = true;
}
catch (InvalidOperationException)
{
} // handler already set
// handler already set
if (clusterOptions.OauthHandlerThrow)
{
throw;
}
}
}

try
{
builder.SetLogHandler((_, m) => this._loggerFactory.CreateLogger(logHandler).KafkaLogHandler(m));
}
catch (InvalidOperationException)
{
} // handler already set
ILogger logger = this._loggerFactory.CreateLogger(LoggerCategoryName);

ILogger fl = this._loggerFactory.CreateLogger(LoggerCategoryName);
ObservableConsumer<TKey, TValue> consumer;

try
{
IConsumer<TKey, TValue> consumer = new ObservableConsumer<TKey, TValue>(builder);

fl.ConsumerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));
consumer = new ObservableConsumer<TKey, TValue>(builder);

return consumer;
logger.ConsumerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);
}
catch (Exception exc)
{
fl.ConsumerCreateError(exc, PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));
logger.ConsumerCreateError(exc, PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);

throw;
}

return consumer;
}

public IProducer<TKey, TValue> CreateProducer<TKey, TValue>(ProducerConfig config, string? cluster = null,
Expand All @@ -177,41 +190,54 @@ public IProducer<TKey, TValue> CreateProducer<TKey, TValue>(ProducerConfig confi

configure?.Invoke(builder);

bool oauthSet = false;
bool logSet = false;

try
{
builder.SetLogHandler((_, m) => this._loggerFactory.CreateLogger(logHandler).KafkaLogHandler(m));
logSet = true;
}
catch (InvalidOperationException)
{
// handler already set
}

if (clusterOptions is { OauthHandler: { }, ClientConfig.SaslMechanism: SaslMechanism.OAuthBearer })
{
try
{
builder.SetOAuthBearerTokenRefreshHandler(clusterOptions.OauthHandler);
oauthSet = true;
}
catch (InvalidOperationException)
{
} // handler already set
// handler already set
if (clusterOptions.OauthHandlerThrow)
{
throw;
}
}
}

try
{
builder.SetLogHandler((_, m) => this._loggerFactory.CreateLogger(logHandler).KafkaLogHandler(m));
}
catch (InvalidOperationException)
{
} // handler already set
ILogger logger = this._loggerFactory.CreateLogger(LoggerCategoryName);

ILogger fl = this._loggerFactory.CreateLogger(LoggerCategoryName);
ObservableProducer<TKey, TValue> producer;

try
{
ObservableProducer<TKey, TValue> producer = new(builder);

fl.ProducerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));
producer = new(builder);

return producer;
logger.ProducerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);
}
catch (Exception exc)
{
fl.ProducerCreateError(exc, PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));
logger.ProducerCreateError(exc, PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);

throw;
}

return producer;
}

public IClient GetOrCreateClient(string? cluster = null)
Expand Down
16 changes: 8 additions & 8 deletions src/Epam.Kafka/LogExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,33 @@ static partial void KafkaLogHandler(this ILogger logger, LogLevel level, string
EventId = 201,
EventName = "ConsumerCreated",
Level = LogLevel.Information,
Message = "Consumer created. KeyType: {KeyType}, ValueType: {ValueType}, Config: {Config}.")]
Message = "Consumer created. KeyType: {KeyType}, ValueType: {ValueType}, Default OAuth Handler: {OauthHandler}, Default Log Handler: {LogHandler}, Config: {Config}.")]
internal static partial void ConsumerCreateOk(this ILogger logger, IEnumerable<KeyValuePair<string, string>> config,
Type keyType, Type valueType);
Type keyType, Type valueType, bool oauthHandler, bool logHandler);

[LoggerMessage(
EventId = 501,
EventName = "ConsumerCreateError",
Level = LogLevel.Error,
Message = "Consumer create error. KeyType: {KeyType}, ValueType: {ValueType}, Config: {Config}.")]
Message = "Consumer create error. KeyType: {KeyType}, ValueType: {ValueType}, Default OAuth Handler: {OauthHandler}, Default Log Handler: {LogHandler}, Config: {Config}.")]
internal static partial void ConsumerCreateError(this ILogger logger, Exception exception,
IEnumerable<KeyValuePair<string, string>> config, Type keyType, Type valueType);
IEnumerable<KeyValuePair<string, string>> config, Type keyType, Type valueType, bool oauthHandler, bool logHandler);

[LoggerMessage(
EventId = 202,
EventName = "ProducerCreated",
Level = LogLevel.Information,
Message = "Producer created. KeyType: {KeyType}, ValueType: {ValueType}, Config: {Config}.")]
Message = "Producer created. KeyType: {KeyType}, ValueType: {ValueType}, Default OAuth Handler: {OauthHandler}, Default Log Handler: {LogHandler}, Config: {Config}.")]
internal static partial void ProducerCreateOk(this ILogger logger, IEnumerable<KeyValuePair<string, string>> config,
Type keyType, Type valueType);
Type keyType, Type valueType, bool oauthHandler, bool logHandler);

[LoggerMessage(
EventId = 502,
EventName = "ProducerCreateError",
Level = LogLevel.Error,
Message = "Producer create error. KeyType: {KeyType}, ValueType: {ValueType}, Config: {Config}.")]
Message = "Producer create error. KeyType: {KeyType}, ValueType: {ValueType}, Default OAuth Handler: {OauthHandler}, Default Log Handler: {LogHandler}, Config: {Config}.")]
internal static partial void ProducerCreateError(this ILogger logger, Exception exception,
IEnumerable<KeyValuePair<string, string>> config, Type keyType, Type valueType);
IEnumerable<KeyValuePair<string, string>> config, Type keyType, Type valueType, bool oauthHandler, bool logHandler);

[LoggerMessage(
EventId = 203,
Expand Down
8 changes: 6 additions & 2 deletions src/Epam.Kafka/Options/KafkaClusterOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public sealed class KafkaClusterOptions : IOptions<KafkaClusterOptions>

internal bool UsedByFactory { get; set; }

internal Action<IClient, string>? OauthHandler { get; private set; }
internal Action<IClient, string?>? OauthHandler { get; private set; }
internal bool OauthHandlerThrow { get; private set; }
internal IAuthenticationHeaderValueProvider? AuthenticationHeaderValueProvider { get; set; }

KafkaClusterOptions IOptions<KafkaClusterOptions>.Value => this;
Expand All @@ -34,20 +35,23 @@ public sealed class KafkaClusterOptions : IOptions<KafkaClusterOptions>
/// It is triggered whenever OAUTHBEARER is the SASL
/// mechanism and a token needs to be retrieved.
/// </summary>
/// <remarks>Warning: https://github.com/confluentinc/confluent-kafka-dotnet/issues/2329</remarks>
/// <param name="createToken">
/// Function with following parameters:
/// <list type="string">Input: value of configuration property 'sasl.oauthbearer.config'.</list>
/// <list type="string">Output: Token refresh result represented by <see cref="OAuthRefreshResult" />.</list>
/// </param>
/// <param name="throwIfAlreadySet">Whether to throw exception if OAuth handler already set.</param>
/// <returns>The <see cref="KafkaClusterOptions" />.</returns>
/// <exception cref="ArgumentNullException"></exception>
public KafkaClusterOptions WithOAuthHandler(Func<string, OAuthRefreshResult> createToken)
public KafkaClusterOptions WithOAuthHandler(Func<string?, OAuthRefreshResult> createToken, bool throwIfAlreadySet = false)
{
if (createToken == null)
{
throw new ArgumentNullException(nameof(createToken));
}

this.OauthHandlerThrow = throwIfAlreadySet;
this.OauthHandler = (client, s) =>
{
#pragma warning disable CA1031 // catch all exceptions and invoke error handler according to kafka client requirements
Expand Down
11 changes: 10 additions & 1 deletion tests/Epam.Kafka.Tests/Common/MockCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public KafkaBuilder LaunchMockCluster(TestWithServices test)
return AddMockCluster(test, this._mockBootstrapServers);
}

public static KafkaBuilder AddMockCluster(TestWithServices test, string? server = null)
public static KafkaBuilder AddMockCluster(TestWithServices test, string? server = null, bool oauth = false)
{
test.ConfigurationBuilder.AddInMemoryCollection(GetDefaultFactoryConfig());

Expand All @@ -39,6 +39,15 @@ public static KafkaBuilder AddMockCluster(TestWithServices test, string? server
options.ClientConfig.BootstrapServers = server;
});
}

if (oauth)
{
kafkaBuilder.WithClusterConfig(ClusterName).Configure(options =>
{
options.ClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
options.ClientConfig.SaslMechanism = SaslMechanism.OAuthBearer;
});
}

kafkaBuilder.WithProducerConfig(DefaultProducer);

Expand Down
2 changes: 1 addition & 1 deletion tests/Epam.Kafka.Tests/Epam.Kafka.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ namespace Epam.Kafka.Options
public KafkaClusterOptions() { }
public Confluent.Kafka.ClientConfig ClientConfig { get; set; }
public Confluent.SchemaRegistry.SchemaRegistryConfig SchemaRegistryConfig { get; set; }
public Epam.Kafka.Options.KafkaClusterOptions WithOAuthHandler(System.Func<string, Epam.Kafka.OAuthRefreshResult> createToken) { }
public Epam.Kafka.Options.KafkaClusterOptions WithOAuthHandler(System.Func<string?, Epam.Kafka.OAuthRefreshResult> createToken, bool throwIfAlreadySet = false) { }
public Epam.Kafka.Options.KafkaClusterOptions WithSchemaRegistryAuthenticationHeader(Confluent.SchemaRegistry.IAuthenticationHeaderValueProvider provider) { }
}
public sealed class KafkaConsumerOptions : Microsoft.Extensions.Options.IOptions<Epam.Kafka.Options.KafkaConsumerOptions>
Expand Down
76 changes: 76 additions & 0 deletions tests/Epam.Kafka.Tests/KafkaFactoryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,82 @@ public void CreateProducerConfig(string? name, string expectedGroup)
Assert.Single(config2);
}

[Fact]
public void CreateOauthConsumerCustom()
{
bool invoked = false;

var kafkaBuilder = MockCluster.AddMockCluster(this, oauth: true);
kafkaBuilder.WithConsumerConfig("any").Configure(x =>
{
x.ConsumerConfig.GroupId = "any";
x.ConsumerConfig.StatisticsIntervalMs = 5;
});

ConsumerConfig config = this.KafkaFactory.CreateConsumerConfig("any");

var consumer =
this.KafkaFactory.CreateConsumer<string, string>(config,
configure: b =>
b.SetOAuthBearerTokenRefreshHandler(
(_, _) => { invoked = true; }));

Assert.NotNull(consumer);

consumer.Consume(1000);

Assert.True(invoked);
}

[Fact]
public void CreateOauthConsumerDefault()
{
bool invoked = false;

var kafkaBuilder = MockCluster.AddMockCluster(this, oauth: true);
kafkaBuilder.WithConsumerConfig("any").Configure(x =>
{
x.ConsumerConfig.GroupId = "any";
x.ConsumerConfig.StatisticsIntervalMs = 5;
});
kafkaBuilder.WithClusterConfig(MockCluster.ClusterName).Configure(x => x.WithOAuthHandler(_ =>
{
invoked = true;
throw new ArithmeticException();
}));

ConsumerConfig config = this.KafkaFactory.CreateConsumerConfig("any");

var consumer =
this.KafkaFactory.CreateConsumer<string, string>(config);

Assert.NotNull(consumer);

consumer.Consume(1000);

Assert.True(invoked);
}

[Fact]
public void CreateOauthConsumerThrow()
{
var kafkaBuilder = MockCluster.AddMockCluster(this, oauth: true);
kafkaBuilder.WithConsumerConfig("any").Configure(x =>
{
x.ConsumerConfig.GroupId = "any";
x.ConsumerConfig.StatisticsIntervalMs = 5;
});
kafkaBuilder.WithClusterConfig(MockCluster.ClusterName)
.Configure(x => x.WithOAuthHandler(_ => throw new ArithmeticException(), true));

ConsumerConfig config = this.KafkaFactory.CreateConsumerConfig("any");

Assert.Throws<InvalidOperationException>(() => this.KafkaFactory.CreateConsumer<string, string>(config,
configure: b =>
b.SetOAuthBearerTokenRefreshHandler(
(_, _) => { })));
}

[Fact]
public void CreateConfigsWithPlaceholders()
{
Expand Down