diff --git a/.editorconfig b/.editorconfig
index b72cac1..a8bdc24 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -62,6 +62,7 @@ dotnet_diagnostic.CA1507.severity = warning
dotnet_diagnostic.CA1805.severity = warning
csharp_style_prefer_primary_constructors = false:suggestion
csharp_style_prefer_readonly_struct_member = true:suggestion
+csharp_prefer_system_threading_lock = true:suggestion
[*.{cs,vb}]
#### Naming styles ####
@@ -143,6 +144,9 @@ dotnet_style_qualification_for_field = true:suggestion
dotnet_style_qualification_for_property = true:suggestion
dotnet_style_qualification_for_method = true:suggestion
dotnet_style_qualification_for_event = true:suggestion
+csharp_style_unused_value_expression_statement_preference = discard_variable:none
+dotnet_diagnostic.IDE0055.severity = none
+dotnet_diagnostic.IDE0057.severity = none
dotnet_diagnostic.CA1051.severity = warning
dotnet_diagnostic.CA1068.severity = warning
dotnet_diagnostic.CA1069.severity = error
@@ -178,7 +182,7 @@ dotnet_diagnostic.CA3075.severity = warning
dotnet_diagnostic.CA2201.severity = warning
dotnet_diagnostic.CA2219.severity = warning
dotnet_diagnostic.CA2251.severity = suggestion
-dotnet_style_prefer_collection_expression = false:warning
+dotnet_style_prefer_collection_expression = never:warning
dotnet_diagnostic.CA1510.severity = none
dotnet_diagnostic.CA1511.severity = none
dotnet_diagnostic.CA1512.severity = none
diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml
index 9f75430..3feb71a 100644
--- a/.github/workflows/dotnet.yml
+++ b/.github/workflows/dotnet.yml
@@ -32,7 +32,7 @@ jobs:
- name: Setup .NET
uses: actions/setup-dotnet@v4
with:
- dotnet-version: 8.0.x
+ dotnet-version: 9.0.x
- name: Restore
run: dotnet restore
- name: Build
@@ -85,7 +85,7 @@ jobs:
- name: Setup .NET
uses: actions/setup-dotnet@v4
with:
- dotnet-version: 8.0.x
+ dotnet-version: 9.0.x
source-url: https://api.nuget.org/v3/index.json
env:
NUGET_AUTH_TOKEN: ${{secrets.NUGET_TOKEN}}
diff --git a/sample/Epam.Kafka.Sample.Net462/App.config b/sample/Epam.Kafka.Sample.Net462/App.config
index 0e486a3..8e4688e 100644
--- a/sample/Epam.Kafka.Sample.Net462/App.config
+++ b/sample/Epam.Kafka.Sample.Net462/App.config
@@ -15,7 +15,11 @@
-
+
+
+
+
+
diff --git a/sample/Epam.Kafka.Sample.Net462/Epam.Kafka.Sample.Net462.csproj b/sample/Epam.Kafka.Sample.Net462/Epam.Kafka.Sample.Net462.csproj
index ef9aaea..b91a98b 100644
--- a/sample/Epam.Kafka.Sample.Net462/Epam.Kafka.Sample.Net462.csproj
+++ b/sample/Epam.Kafka.Sample.Net462/Epam.Kafka.Sample.Net462.csproj
@@ -196,8 +196,8 @@
packages\System.Text.Encodings.Web.8.0.0\lib\net462\System.Text.Encodings.Web.dll
True
-
- packages\System.Text.Json.8.0.4\lib\net462\System.Text.Json.dll
+
+ packages\System.Text.Json.8.0.5\lib\net462\System.Text.Json.dll
True
diff --git a/sample/Epam.Kafka.Sample.Net462/packages.config b/sample/Epam.Kafka.Sample.Net462/packages.config
index 117dcc9..6f8183f 100644
--- a/sample/Epam.Kafka.Sample.Net462/packages.config
+++ b/sample/Epam.Kafka.Sample.Net462/packages.config
@@ -45,7 +45,7 @@
-
+
\ No newline at end of file
diff --git a/sample/Epam.Kafka.Sample/Epam.Kafka.Sample.csproj b/sample/Epam.Kafka.Sample/Epam.Kafka.Sample.csproj
index 9d58420..a6cf44a 100644
--- a/sample/Epam.Kafka.Sample/Epam.Kafka.Sample.csproj
+++ b/sample/Epam.Kafka.Sample/Epam.Kafka.Sample.csproj
@@ -20,10 +20,10 @@
-
-
-
-
+
+
+
+
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
index 75b1dfb..fbccef3 100644
--- a/src/Directory.Build.props
+++ b/src/Directory.Build.props
@@ -25,7 +25,7 @@
en
https://github.com/epam/epam-kafka/releases
0
- 2.3.$(VersionPrefix)
+ 2.4.$(VersionPrefix)
diff --git a/src/Epam.Kafka.PubSub.EntityFramework6/CompatibilityExtensions.cs b/src/Epam.Kafka.PubSub.EntityFramework6/CompatibilityExtensions.cs
index cbf9090..4c90a17 100644
--- a/src/Epam.Kafka.PubSub.EntityFramework6/CompatibilityExtensions.cs
+++ b/src/Epam.Kafka.PubSub.EntityFramework6/CompatibilityExtensions.cs
@@ -13,7 +13,7 @@ public static IQueryable AsTracking(this IQueryable queryable)
return queryable;
}
- public static int SaveChanges(this DbContext context, bool acceptAllChangesOnSuccess)
+ public static int SaveChanges(this DbContext context, bool _)
{
return context.SaveChanges();
}
diff --git a/src/Epam.Kafka.PubSub.EntityFrameworkCore/Epam.Kafka.PubSub.EntityFrameworkCore.csproj b/src/Epam.Kafka.PubSub.EntityFrameworkCore/Epam.Kafka.PubSub.EntityFrameworkCore.csproj
index 2997577..933ffcf 100644
--- a/src/Epam.Kafka.PubSub.EntityFrameworkCore/Epam.Kafka.PubSub.EntityFrameworkCore.csproj
+++ b/src/Epam.Kafka.PubSub.EntityFrameworkCore/Epam.Kafka.PubSub.EntityFrameworkCore.csproj
@@ -10,11 +10,11 @@
-
+
-
+
diff --git a/src/Epam.Kafka.PubSub.EntityFrameworkCore/Publication/Contracts/DbContextEntityPublicationHandler.cs b/src/Epam.Kafka.PubSub.EntityFrameworkCore/Publication/Contracts/DbContextEntityPublicationHandler.cs
index 9e4e52a..ccf31f3 100644
--- a/src/Epam.Kafka.PubSub.EntityFrameworkCore/Publication/Contracts/DbContextEntityPublicationHandler.cs
+++ b/src/Epam.Kafka.PubSub.EntityFrameworkCore/Publication/Contracts/DbContextEntityPublicationHandler.cs
@@ -84,10 +84,12 @@ protected override void TransactionCommitted(IReadOnlyCollection entiti
}
catch (DbUpdateConcurrencyException exception)
{
+#pragma warning disable IDE0008 // Use explicit type not possible due to #if directives
foreach (var entry in exception.Entries)
{
this.Logger.PublicationEntityDetached(exception, "Commit", this.FindPrimaryKeyForLogs(entry), typeof(TEntity));
}
+#pragma warning restore IDE0008
}
}
diff --git a/src/Epam.Kafka.PubSub.EntityFrameworkCore/Subscription/DbContextOffsetsStorage.cs b/src/Epam.Kafka.PubSub.EntityFrameworkCore/Subscription/DbContextOffsetsStorage.cs
index cc70bed..181db17 100644
--- a/src/Epam.Kafka.PubSub.EntityFrameworkCore/Subscription/DbContextOffsetsStorage.cs
+++ b/src/Epam.Kafka.PubSub.EntityFrameworkCore/Subscription/DbContextOffsetsStorage.cs
@@ -127,6 +127,7 @@ public IReadOnlyCollection CommitOrReset(
{
cancellationToken.ThrowIfCancellationRequested();
+#pragma warning disable IDE0008 // Use explicit type not possible due to #if directives
foreach (var entry in exception.Entries)
{
if (entry.Entity is KafkaTopicState)
@@ -152,6 +153,7 @@ public IReadOnlyCollection CommitOrReset(
}
}
}
+#pragma warning restore IDE0008
this._context.SaveChanges(true);
}
diff --git a/src/Epam.Kafka.PubSub.EntityFrameworkCore/Subscription/State/KafkaTopicStateExtensions.cs b/src/Epam.Kafka.PubSub.EntityFrameworkCore/Subscription/State/KafkaTopicStateExtensions.cs
index 5200e6c..7a45a38 100644
--- a/src/Epam.Kafka.PubSub.EntityFrameworkCore/Subscription/State/KafkaTopicStateExtensions.cs
+++ b/src/Epam.Kafka.PubSub.EntityFrameworkCore/Subscription/State/KafkaTopicStateExtensions.cs
@@ -1,5 +1,8 @@
// Copyright © 2024 EPAM Systems
+#pragma warning disable IDE0065
+#pragma warning disable IDE0008
+
#if EF6
namespace Epam.Kafka.PubSub.EntityFramework6.Subscription.State;
#else
@@ -45,4 +48,7 @@ public static ETBuilder AddKafkaState(this ModelBuilder builder)
return e;
}
-}
\ No newline at end of file
+}
+
+#pragma warning restore IDE0065
+#pragma warning restore IDE0008
\ No newline at end of file
diff --git a/src/Epam.Kafka.PubSub/Common/HealthChecks/PubSubHealthCheck.cs b/src/Epam.Kafka.PubSub/Common/HealthChecks/PubSubHealthCheck.cs
index 25c7961..7f108c2 100644
--- a/src/Epam.Kafka.PubSub/Common/HealthChecks/PubSubHealthCheck.cs
+++ b/src/Epam.Kafka.PubSub/Common/HealthChecks/PubSubHealthCheck.cs
@@ -52,7 +52,13 @@ protected PubSubHealthCheck(PubSubOptions options)
default:
throw new InvalidOperationException(
$"Unknown pipeline status {monitor.Pipeline.Value}. Supported values: " +
- string.Join(", ", Enum.GetNames(typeof(PipelineStatus))));
+ string.Join(", ",
+#if NET6_0_OR_GREATER
+ Enum.GetNames()
+#else
+ Enum.GetNames(typeof(PipelineStatus))
+#endif
+ ));
}
return Task.FromResult(new HealthCheckResult(
diff --git a/src/Epam.Kafka.PubSub/Common/PubSubBackgroundService.cs b/src/Epam.Kafka.PubSub/Common/PubSubBackgroundService.cs
index 2032b39..8489389 100644
--- a/src/Epam.Kafka.PubSub/Common/PubSubBackgroundService.cs
+++ b/src/Epam.Kafka.PubSub/Common/PubSubBackgroundService.cs
@@ -59,7 +59,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Yield();
- if (this.Options.Enabled == false)
+ if (!this.Options.Enabled)
{
this.Monitor.Pipeline.Update(PipelineStatus.Disabled);
return;
diff --git a/src/Epam.Kafka.PubSub/Publication/Options/PublicationOptionsValidate.cs b/src/Epam.Kafka.PubSub/Publication/Options/PublicationOptionsValidate.cs
index 9514c4e..3b14961 100644
--- a/src/Epam.Kafka.PubSub/Publication/Options/PublicationOptionsValidate.cs
+++ b/src/Epam.Kafka.PubSub/Publication/Options/PublicationOptionsValidate.cs
@@ -16,7 +16,7 @@ public ValidateOptionsResult Validate(string? name, PublicationOptions options)
throw new ArgumentNullException(nameof(options));
}
- if (options.Enabled == false)
+ if (!options.Enabled)
{
return ValidateOptionsResult.Success;
}
diff --git a/src/Epam.Kafka.PubSub/Publication/PublicationBackgroundService.cs b/src/Epam.Kafka.PubSub/Publication/PublicationBackgroundService.cs
index 11e3e7e..3ee336f 100644
--- a/src/Epam.Kafka.PubSub/Publication/PublicationBackgroundService.cs
+++ b/src/Epam.Kafka.PubSub/Publication/PublicationBackgroundService.cs
@@ -80,16 +80,22 @@ protected override IPublicationTopicWrapper CreateTopicWrapper()
bool implicitPreprocessor = ks != null || vs != null || config.TransactionalId != null;
+ IPublicationTopicWrapper result;
+
if (this.Options.SerializationPreprocessor ?? implicitPreprocessor)
{
- return new PublicationSerializeKeyAndValueTopicWrapper(this.KafkaFactory, this.Monitor,
+ result = new PublicationSerializeKeyAndValueTopicWrapper(this.KafkaFactory, this.Monitor,
config, this.Options, this.Logger,
ks, vs, this.Options.Partitioner);
}
+ else
+ {
+ result = new PublicationTopicWrapper(this.KafkaFactory, this.Monitor, config, this.Options,
+ this.Logger,
+ ks, vs, this.Options.Partitioner);
+ }
- return new PublicationTopicWrapper(this.KafkaFactory, this.Monitor, config, this.Options,
- this.Logger,
- ks, vs, this.Options.Partitioner);
+ return result;
}
protected override TimeSpan? GetBatchFinishedTimeout(PublicationBatchResult subBatchResult)
diff --git a/src/Epam.Kafka.PubSub/Publication/Topics/PublicationTopicWrapper.cs b/src/Epam.Kafka.PubSub/Publication/Topics/PublicationTopicWrapper.cs
index e99308a..2a2cf19 100644
--- a/src/Epam.Kafka.PubSub/Publication/Topics/PublicationTopicWrapper.cs
+++ b/src/Epam.Kafka.PubSub/Publication/Topics/PublicationTopicWrapper.cs
@@ -77,6 +77,9 @@ public PublicationTopicWrapper(
b.SetValueSerializer(valueSerializer);
}
});
+
+ // warmup to avoid potential issues with OAuth handler
+ this.Producer.Poll(TimeSpan.Zero);
}
private static void ConfigureReports(ProducerConfig config)
diff --git a/src/Epam.Kafka.PubSub/Subscription/Options/SubscriptionOptionsValidate.cs b/src/Epam.Kafka.PubSub/Subscription/Options/SubscriptionOptionsValidate.cs
index 08b46a5..e54ac85 100644
--- a/src/Epam.Kafka.PubSub/Subscription/Options/SubscriptionOptionsValidate.cs
+++ b/src/Epam.Kafka.PubSub/Subscription/Options/SubscriptionOptionsValidate.cs
@@ -17,7 +17,7 @@ public ValidateOptionsResult Validate(string? name, SubscriptionOptions options)
throw new ArgumentNullException(nameof(options));
}
- if (options.Enabled == false)
+ if (!options.Enabled)
{
return ValidateOptionsResult.Success;
}
diff --git a/src/Epam.Kafka.PubSub/Subscription/Pipeline/SubscriptionBatchResult.cs b/src/Epam.Kafka.PubSub/Subscription/Pipeline/SubscriptionBatchResult.cs
index 12fa698..0cd1ece 100644
--- a/src/Epam.Kafka.PubSub/Subscription/Pipeline/SubscriptionBatchResult.cs
+++ b/src/Epam.Kafka.PubSub/Subscription/Pipeline/SubscriptionBatchResult.cs
@@ -1,7 +1,5 @@
// Copyright © 2024 EPAM Systems
-using Confluent.Kafka;
-
using Epam.Kafka.PubSub.Common.Pipeline;
namespace Epam.Kafka.PubSub.Subscription.Pipeline;
diff --git a/src/Epam.Kafka.PubSub/Subscription/State/ExternalState.cs b/src/Epam.Kafka.PubSub/Subscription/State/ExternalState.cs
index 70139b0..29f64e4 100644
--- a/src/Epam.Kafka.PubSub/Subscription/State/ExternalState.cs
+++ b/src/Epam.Kafka.PubSub/Subscription/State/ExternalState.cs
@@ -40,7 +40,7 @@ protected override void AssignConsumer(SubscriptionTopicWrapper CommitState(
- SubscriptionTopicWrapper topic,
- TopicPartitionOffset item,
+ SubscriptionTopicWrapper topic,
+ TopicPartitionOffset item,
List pause,
List reset)
{
@@ -112,7 +112,7 @@ public static void CommitOffsetIfNeeded(
}
else if (item.Offset == Offset.End)
{
- var w = topic.Consumer.QueryWatermarkOffsets(item.TopicPartition, TimeSpan.FromSeconds(5));
+ WatermarkOffsets w = topic.Consumer.QueryWatermarkOffsets(item.TopicPartition, TimeSpan.FromSeconds(5));
toCommit.Add(new TopicPartitionOffset(item.TopicPartition, w.High));
}
}
diff --git a/src/Epam.Kafka.PubSub/Subscription/SubscriptionBackgroundService.cs b/src/Epam.Kafka.PubSub/Subscription/SubscriptionBackgroundService.cs
index b7e2ade..9e994c4 100644
--- a/src/Epam.Kafka.PubSub/Subscription/SubscriptionBackgroundService.cs
+++ b/src/Epam.Kafka.PubSub/Subscription/SubscriptionBackgroundService.cs
@@ -101,7 +101,7 @@ protected override void ExecuteBatch(
this.Monitor.Batch.Update(BatchStatus.Reading);
bool unassignedBeforeRead = state.GetBatch(
- topic, activitySpan, out var batch, cancellationToken);
+ topic, activitySpan, out IReadOnlyCollection> batch, cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
diff --git a/src/Epam.Kafka.PubSub/Subscription/Topics/SubscriptionTopicWrapper.cs b/src/Epam.Kafka.PubSub/Subscription/Topics/SubscriptionTopicWrapper.cs
index aa89017..12bd267 100644
--- a/src/Epam.Kafka.PubSub/Subscription/Topics/SubscriptionTopicWrapper.cs
+++ b/src/Epam.Kafka.PubSub/Subscription/Topics/SubscriptionTopicWrapper.cs
@@ -67,6 +67,10 @@ public SubscriptionTopicWrapper(IKafkaFactory kafkaFactory,
this.ConfigureConsumerBuilder(b);
});
+
+ // Warmup consumer to avoid potential issues with OAuth handler.
+ // Consumer just created, so not assigned to any partition.
+ this.Consumer.Consume(100);
}
public SubscriptionMonitor Monitor { get; }
@@ -79,7 +83,10 @@ public SubscriptionTopicWrapper(IKafkaFactory kafkaFactory,
public Func, IReadOnlyCollection>? ExternalState { get; set; }
- public bool TryGetOffset(TopicPartition tp, out Offset result) => this._offsets.TryGetValue(tp, out result);
+ public bool TryGetOffset(TopicPartition tp, out Offset result)
+ {
+ return this._offsets.TryGetValue(tp, out result);
+ }
public void OnAssign(IReadOnlyCollection items)
{
@@ -229,7 +236,7 @@ public void OnReset(IReadOnlyCollection items)
List reset = new(items.Count);
List resume = new(items.Count);
- foreach (var tpo in items)
+ foreach (TopicPartitionOffset tpo in items)
{
if (this._paused.Remove(tpo.TopicPartition))
{
@@ -285,7 +292,7 @@ private bool OnPauseEnumerate(IEnumerable items)
throw;
}
- foreach (var r in result)
+ foreach (TopicPartition r in result)
{
this._paused.Add(r);
this._offsets[r] = ExternalOffset.Paused;
@@ -485,6 +492,7 @@ private void ReadToBuffer(ActivityWrapper span, CancellationToken cancellationTo
{
ConsumeResult record = consumeException.ConsumerRecord;
+#pragma warning disable IDE0010 // Add missing cases
switch (consumeException.Error.Code)
{
case ErrorCode.Local_Fatal:
@@ -507,6 +515,7 @@ private void ReadToBuffer(ActivityWrapper span, CancellationToken cancellationTo
throw;
}
}
+#pragma warning restore IDE0010 // Add missing cases
// unable to return at least something, so only throw
if (this._buffer.Count == 0)
diff --git a/src/Epam.Kafka.PubSub/Utils/ActivityWrapper.cs b/src/Epam.Kafka.PubSub/Utils/ActivityWrapper.cs
index 601d341..926f41b 100644
--- a/src/Epam.Kafka.PubSub/Utils/ActivityWrapper.cs
+++ b/src/Epam.Kafka.PubSub/Utils/ActivityWrapper.cs
@@ -5,6 +5,7 @@
using System.Text;
namespace Epam.Kafka.PubSub.Utils;
+
#pragma warning disable CA2000 // StartActivity returns same activity object, so it will be disposed.
internal sealed class ActivityWrapper : IDisposable
{
diff --git a/src/Epam.Kafka/Internals/KafkaFactory.cs b/src/Epam.Kafka/Internals/KafkaFactory.cs
index a56483a..c984e29 100644
--- a/src/Epam.Kafka/Internals/KafkaFactory.cs
+++ b/src/Epam.Kafka/Internals/KafkaFactory.cs
@@ -121,41 +121,54 @@ public IConsumer CreateConsumer(ConsumerConfig confi
configure?.Invoke(builder);
+ bool oauthSet = false;
+ bool logSet = false;
+
+ try
+ {
+ builder.SetLogHandler((_, m) => this._loggerFactory.CreateLogger(logHandler).KafkaLogHandler(m));
+ logSet = true;
+ }
+ catch (InvalidOperationException)
+ {
+ // handler already set
+ }
+
if (clusterOptions is { OauthHandler: { }, ClientConfig.SaslMechanism: SaslMechanism.OAuthBearer })
{
try
{
builder.SetOAuthBearerTokenRefreshHandler(clusterOptions.OauthHandler.Invoke);
+ oauthSet = true;
}
catch (InvalidOperationException)
{
- } // handler already set
+ // handler already set
+ if (clusterOptions.OauthHandlerThrow)
+ {
+ throw;
+ }
+ }
}
- try
- {
- builder.SetLogHandler((_, m) => this._loggerFactory.CreateLogger(logHandler).KafkaLogHandler(m));
- }
- catch (InvalidOperationException)
- {
- } // handler already set
+ ILogger logger = this._loggerFactory.CreateLogger(LoggerCategoryName);
- ILogger fl = this._loggerFactory.CreateLogger(LoggerCategoryName);
+ ObservableConsumer consumer;
try
{
- IConsumer consumer = new ObservableConsumer(builder);
-
- fl.ConsumerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));
+ consumer = new ObservableConsumer(builder);
- return consumer;
+ logger.ConsumerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);
}
catch (Exception exc)
{
- fl.ConsumerCreateError(exc, PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));
+ logger.ConsumerCreateError(exc, PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);
throw;
}
+
+ return consumer;
}
public IProducer CreateProducer(ProducerConfig config, string? cluster = null,
@@ -177,41 +190,54 @@ public IProducer CreateProducer(ProducerConfig confi
configure?.Invoke(builder);
+ bool oauthSet = false;
+ bool logSet = false;
+
+ try
+ {
+ builder.SetLogHandler((_, m) => this._loggerFactory.CreateLogger(logHandler).KafkaLogHandler(m));
+ logSet = true;
+ }
+ catch (InvalidOperationException)
+ {
+ // handler already set
+ }
+
if (clusterOptions is { OauthHandler: { }, ClientConfig.SaslMechanism: SaslMechanism.OAuthBearer })
{
try
{
builder.SetOAuthBearerTokenRefreshHandler(clusterOptions.OauthHandler);
+ oauthSet = true;
}
catch (InvalidOperationException)
{
- } // handler already set
+ // handler already set
+ if (clusterOptions.OauthHandlerThrow)
+ {
+ throw;
+ }
+ }
}
- try
- {
- builder.SetLogHandler((_, m) => this._loggerFactory.CreateLogger(logHandler).KafkaLogHandler(m));
- }
- catch (InvalidOperationException)
- {
- } // handler already set
+ ILogger logger = this._loggerFactory.CreateLogger(LoggerCategoryName);
- ILogger fl = this._loggerFactory.CreateLogger(LoggerCategoryName);
+ ObservableProducer producer;
try
{
- ObservableProducer producer = new(builder);
+ producer = new(builder);
- fl.ProducerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));
-
- return producer;
+ logger.ProducerCreateOk(PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);
}
catch (Exception exc)
{
- fl.ProducerCreateError(exc, PrepareConfigForLogs(config), typeof(TKey), typeof(TValue));
+ logger.ProducerCreateError(exc, PrepareConfigForLogs(config), typeof(TKey), typeof(TValue), oauthSet, logSet);
throw;
}
+
+ return producer;
}
public IClient GetOrCreateClient(string? cluster = null)
@@ -220,15 +246,16 @@ public IClient GetOrCreateClient(string? cluster = null)
KafkaClusterOptions clusterOptions = this.GetAndValidateClusterOptions(cluster);
- SharedClient? result;
-
- lock (this._syncObj)
+ if (!this._clients.TryGetValue(clusterOptions, out SharedClient? result))
{
- if (!this._clients.TryGetValue(clusterOptions, out result))
+ lock (this._syncObj)
{
- result = new SharedClient(this, cluster);
+ if (!this._clients.TryGetValue(clusterOptions, out result))
+ {
+ result = new SharedClient(this, cluster);
- this._clients.Add(clusterOptions, result);
+ this._clients.Add(clusterOptions, result);
+ }
}
}
@@ -241,16 +268,31 @@ public ISchemaRegistryClient GetOrCreateSchemaRegistryClient(string? cluster = n
KafkaClusterOptions clusterOptions = this.GetAndValidateClusterOptions(cluster);
- CachedSchemaRegistryClient? result;
-
- lock (this._syncObj)
+ if (!this._registries.TryGetValue(clusterOptions, out CachedSchemaRegistryClient? result))
{
- if (!this._registries.TryGetValue(clusterOptions, out result))
- {
- result = new CachedSchemaRegistryClient(clusterOptions.SchemaRegistryConfig,
- clusterOptions.AuthenticationHeaderValueProvider);
+ ILogger logger = this._loggerFactory.CreateLogger(LoggerCategoryName);
- this._registries.Add(clusterOptions, result);
+ lock (this._syncObj)
+ {
+ if (!this._registries.TryGetValue(clusterOptions, out result))
+ {
+ try
+ {
+ result = new CachedSchemaRegistryClient(clusterOptions.SchemaRegistryConfig,
+ clusterOptions.AuthenticationHeaderValueProvider);
+
+ this._registries.Add(clusterOptions, result);
+
+ logger.RegistryClientCreateOk(PrepareConfigForLogs(clusterOptions.SchemaRegistryConfig),
+ clusterOptions.AuthenticationHeaderValueProvider?.GetType());
+ }
+ catch (Exception exception)
+ {
+ logger.RegistryClientCreateError(exception, clusterOptions.SchemaRegistryConfig,
+ clusterOptions.AuthenticationHeaderValueProvider?.GetType());
+ throw;
+ }
+ }
}
}
@@ -265,7 +307,7 @@ private static void ValidateLogicalName(string? configName, string entityType)
}
}
- private static IEnumerable> PrepareConfigForLogs(Config config)
+ private static IEnumerable> PrepareConfigForLogs(IEnumerable> config)
{
return config.Select(x => Contains(x, "password") || Contains(x, "secret")
? new KeyValuePair(x.Key, "*******")
@@ -292,7 +334,15 @@ private KafkaClusterOptions GetAndValidateClusterOptions(string? cluster)
ValidateLogicalName(cluster, "cluster");
// save cluster name for further health check
- this.UsedClusters.Add(cluster!);
+ // https://learn.microsoft.com/en-us/dotnet/standard/collections/thread-safe/
+ // If you're only reading from a shared collection, then you can use the classes in the System.Collections.Generic namespace
+ if (!this.UsedClusters.Contains(cluster!))
+ {
+ lock (this.UsedClusters)
+ {
+ this.UsedClusters.Add(cluster!);
+ }
+ }
try
{
diff --git a/src/Epam.Kafka/LogExtensions.cs b/src/Epam.Kafka/LogExtensions.cs
index f8cdba9..a17bbdc 100644
--- a/src/Epam.Kafka/LogExtensions.cs
+++ b/src/Epam.Kafka/LogExtensions.cs
@@ -58,31 +58,46 @@ static partial void KafkaLogHandler(this ILogger logger, LogLevel level, string
EventId = 201,
EventName = "ConsumerCreated",
Level = LogLevel.Information,
- Message = "Consumer created. KeyType: {KeyType}, ValueType: {ValueType}, Config: {Config}.")]
+ Message = "Consumer created. KeyType: {KeyType}, ValueType: {ValueType}, Default OAuth Handler: {OauthHandler}, Default Log Handler: {LogHandler}, Config: {Config}.")]
internal static partial void ConsumerCreateOk(this ILogger logger, IEnumerable> config,
- Type keyType, Type valueType);
+ Type keyType, Type valueType, bool oauthHandler, bool logHandler);
[LoggerMessage(
EventId = 501,
EventName = "ConsumerCreateError",
Level = LogLevel.Error,
- Message = "Consumer create error. KeyType: {KeyType}, ValueType: {ValueType}, Config: {Config}.")]
+ Message = "Consumer create error. KeyType: {KeyType}, ValueType: {ValueType}, Default OAuth Handler: {OauthHandler}, Default Log Handler: {LogHandler}, Config: {Config}.")]
internal static partial void ConsumerCreateError(this ILogger logger, Exception exception,
- IEnumerable> config, Type keyType, Type valueType);
+ IEnumerable> config, Type keyType, Type valueType, bool oauthHandler, bool logHandler);
[LoggerMessage(
EventId = 202,
EventName = "ProducerCreated",
Level = LogLevel.Information,
- Message = "Producer created. KeyType: {KeyType}, ValueType: {ValueType}, Config: {Config}.")]
+ Message = "Producer created. KeyType: {KeyType}, ValueType: {ValueType}, Default OAuth Handler: {OauthHandler}, Default Log Handler: {LogHandler}, Config: {Config}.")]
internal static partial void ProducerCreateOk(this ILogger logger, IEnumerable> config,
- Type keyType, Type valueType);
+ Type keyType, Type valueType, bool oauthHandler, bool logHandler);
[LoggerMessage(
EventId = 502,
EventName = "ProducerCreateError",
Level = LogLevel.Error,
- Message = "Producer create error. KeyType: {KeyType}, ValueType: {ValueType}, Config: {Config}.")]
+ Message = "Producer create error. KeyType: {KeyType}, ValueType: {ValueType}, Default OAuth Handler: {OauthHandler}, Default Log Handler: {LogHandler}, Config: {Config}.")]
internal static partial void ProducerCreateError(this ILogger logger, Exception exception,
- IEnumerable> config, Type keyType, Type valueType);
+ IEnumerable> config, Type keyType, Type valueType, bool oauthHandler, bool logHandler);
+
+ [LoggerMessage(
+ EventId = 203,
+ EventName = "RegistryClientCreated",
+ Level = LogLevel.Information,
+ Message = "Registry client created. HeaderProviderType: {HeaderProviderType}, Config: {Config}.")]
+ internal static partial void RegistryClientCreateOk(this ILogger logger, IEnumerable> config, Type? headerProviderType);
+
+ [LoggerMessage(
+ EventId = 503,
+ EventName = "RegistryClientCreateError",
+ Level = LogLevel.Error,
+ Message = "Registry client create error. HeaderProviderType: {HeaderProviderType}, Config: {Config}.")]
+ internal static partial void RegistryClientCreateError(this ILogger logger, Exception exception,
+ IEnumerable> config, Type? headerProviderType);
}
\ No newline at end of file
diff --git a/src/Epam.Kafka/Options/Configuration/OptionsFromConfiguration.cs b/src/Epam.Kafka/Options/Configuration/OptionsFromConfiguration.cs
index 5445171..6d6127e 100644
--- a/src/Epam.Kafka/Options/Configuration/OptionsFromConfiguration.cs
+++ b/src/Epam.Kafka/Options/Configuration/OptionsFromConfiguration.cs
@@ -1,9 +1,5 @@
// Copyright © 2024 EPAM Systems
-#if !NET6_0_OR_GREATER
-using Epam.Kafka.Internals;
-#endif
-
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
@@ -35,7 +31,7 @@ public void Configure(string? name, TOptions options)
return;
}
- IConfigurationSection section = this._configuration.GetSection(this.ParentSectionName).GetSection(name);
+ IConfigurationSection section = this._configuration.GetSection(this.ParentSectionName).GetSection(name!);
if (section.Exists())
{
diff --git a/src/Epam.Kafka/Options/KafkaClusterOptions.cs b/src/Epam.Kafka/Options/KafkaClusterOptions.cs
index 3eae871..fbed6b1 100644
--- a/src/Epam.Kafka/Options/KafkaClusterOptions.cs
+++ b/src/Epam.Kafka/Options/KafkaClusterOptions.cs
@@ -24,7 +24,8 @@ public sealed class KafkaClusterOptions : IOptions
internal bool UsedByFactory { get; set; }
- internal Action? OauthHandler { get; private set; }
+ internal Action? OauthHandler { get; private set; }
+ internal bool OauthHandlerThrow { get; private set; }
internal IAuthenticationHeaderValueProvider? AuthenticationHeaderValueProvider { get; set; }
KafkaClusterOptions IOptions.Value => this;
@@ -34,20 +35,23 @@ public sealed class KafkaClusterOptions : IOptions
/// It is triggered whenever OAUTHBEARER is the SASL
/// mechanism and a token needs to be retrieved.
///
+ /// Warning: https://github.com/confluentinc/confluent-kafka-dotnet/issues/2329
///
/// Function with following parameters:
/// Input: value of configuration property 'sasl.oauthbearer.config'.
/// Output: Token refresh result represented by .
///
+ /// Whether to throw exception if OAuth handler already set.
/// The .
///
- public KafkaClusterOptions WithOAuthHandler(Func createToken)
+ public KafkaClusterOptions WithOAuthHandler(Func createToken, bool throwIfAlreadySet = false)
{
if (createToken == null)
{
throw new ArgumentNullException(nameof(createToken));
}
+ this.OauthHandlerThrow = throwIfAlreadySet;
this.OauthHandler = (client, s) =>
{
#pragma warning disable CA1031 // catch all exceptions and invoke error handler according to kafka client requirements
diff --git a/tests/Directory.Build.props b/tests/Directory.Build.props
index 596b136..49aa3fe 100644
--- a/tests/Directory.Build.props
+++ b/tests/Directory.Build.props
@@ -8,8 +8,8 @@
-
-
+
+
diff --git a/tests/Epam.Kafka.PubSub.EntityFramework6.Tests/Epam.Kafka.PubSub.EntityFramework6.Tests.csproj b/tests/Epam.Kafka.PubSub.EntityFramework6.Tests/Epam.Kafka.PubSub.EntityFramework6.Tests.csproj
index 7234287..5d0d249 100644
--- a/tests/Epam.Kafka.PubSub.EntityFramework6.Tests/Epam.Kafka.PubSub.EntityFramework6.Tests.csproj
+++ b/tests/Epam.Kafka.PubSub.EntityFramework6.Tests/Epam.Kafka.PubSub.EntityFramework6.Tests.csproj
@@ -30,6 +30,7 @@
+
diff --git a/tests/Epam.Kafka.PubSub.EntityFrameworkCore.Tests/Epam.Kafka.PubSub.EntityFrameworkCore.Tests.csproj b/tests/Epam.Kafka.PubSub.EntityFrameworkCore.Tests/Epam.Kafka.PubSub.EntityFrameworkCore.Tests.csproj
index d0df273..f95fc1f 100644
--- a/tests/Epam.Kafka.PubSub.EntityFrameworkCore.Tests/Epam.Kafka.PubSub.EntityFrameworkCore.Tests.csproj
+++ b/tests/Epam.Kafka.PubSub.EntityFrameworkCore.Tests/Epam.Kafka.PubSub.EntityFrameworkCore.Tests.csproj
@@ -6,11 +6,11 @@
-
+
-
+
diff --git a/tests/Epam.Kafka.Tests/Common/MockCluster.cs b/tests/Epam.Kafka.Tests/Common/MockCluster.cs
index 57e64e1..e24c125 100644
--- a/tests/Epam.Kafka.Tests/Common/MockCluster.cs
+++ b/tests/Epam.Kafka.Tests/Common/MockCluster.cs
@@ -25,7 +25,7 @@ public KafkaBuilder LaunchMockCluster(TestWithServices test)
return AddMockCluster(test, this._mockBootstrapServers);
}
- public static KafkaBuilder AddMockCluster(TestWithServices test, string? server = null)
+ public static KafkaBuilder AddMockCluster(TestWithServices test, string? server = null, bool oauth = false)
{
test.ConfigurationBuilder.AddInMemoryCollection(GetDefaultFactoryConfig());
@@ -39,6 +39,15 @@ public static KafkaBuilder AddMockCluster(TestWithServices test, string? server
options.ClientConfig.BootstrapServers = server;
});
}
+
+ if (oauth)
+ {
+ kafkaBuilder.WithClusterConfig(ClusterName).Configure(options =>
+ {
+ options.ClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
+ options.ClientConfig.SaslMechanism = SaslMechanism.OAuthBearer;
+ });
+ }
kafkaBuilder.WithProducerConfig(DefaultProducer);
diff --git a/tests/Epam.Kafka.Tests/Epam.Kafka.approved.txt b/tests/Epam.Kafka.Tests/Epam.Kafka.approved.txt
index 22f4059..8fcdd81 100644
--- a/tests/Epam.Kafka.Tests/Epam.Kafka.approved.txt
+++ b/tests/Epam.Kafka.Tests/Epam.Kafka.approved.txt
@@ -65,7 +65,7 @@ namespace Epam.Kafka.Options
public KafkaClusterOptions() { }
public Confluent.Kafka.ClientConfig ClientConfig { get; set; }
public Confluent.SchemaRegistry.SchemaRegistryConfig SchemaRegistryConfig { get; set; }
- public Epam.Kafka.Options.KafkaClusterOptions WithOAuthHandler(System.Func createToken) { }
+ public Epam.Kafka.Options.KafkaClusterOptions WithOAuthHandler(System.Func createToken, bool throwIfAlreadySet = false) { }
public Epam.Kafka.Options.KafkaClusterOptions WithSchemaRegistryAuthenticationHeader(Confluent.SchemaRegistry.IAuthenticationHeaderValueProvider provider) { }
}
public sealed class KafkaConsumerOptions : Microsoft.Extensions.Options.IOptions
diff --git a/tests/Epam.Kafka.Tests/KafkaFactoryTests.cs b/tests/Epam.Kafka.Tests/KafkaFactoryTests.cs
index ea6e0e1..f41a7e1 100644
--- a/tests/Epam.Kafka.Tests/KafkaFactoryTests.cs
+++ b/tests/Epam.Kafka.Tests/KafkaFactoryTests.cs
@@ -108,6 +108,82 @@ public void CreateProducerConfig(string? name, string expectedGroup)
Assert.Single(config2);
}
+ [Fact]
+ public void CreateOauthConsumerCustom()
+ {
+ bool invoked = false;
+
+ var kafkaBuilder = MockCluster.AddMockCluster(this, oauth: true);
+ kafkaBuilder.WithConsumerConfig("any").Configure(x =>
+ {
+ x.ConsumerConfig.GroupId = "any";
+ x.ConsumerConfig.StatisticsIntervalMs = 5;
+ });
+
+ ConsumerConfig config = this.KafkaFactory.CreateConsumerConfig("any");
+
+ var consumer =
+ this.KafkaFactory.CreateConsumer(config,
+ configure: b =>
+ b.SetOAuthBearerTokenRefreshHandler(
+ (_, _) => { invoked = true; }));
+
+ Assert.NotNull(consumer);
+
+ consumer.Consume(1000);
+
+ Assert.True(invoked);
+ }
+
+ [Fact]
+ public void CreateOauthConsumerDefault()
+ {
+ bool invoked = false;
+
+ var kafkaBuilder = MockCluster.AddMockCluster(this, oauth: true);
+ kafkaBuilder.WithConsumerConfig("any").Configure(x =>
+ {
+ x.ConsumerConfig.GroupId = "any";
+ x.ConsumerConfig.StatisticsIntervalMs = 5;
+ });
+ kafkaBuilder.WithClusterConfig(MockCluster.ClusterName).Configure(x => x.WithOAuthHandler(_ =>
+ {
+ invoked = true;
+ throw new ArithmeticException();
+ }));
+
+ ConsumerConfig config = this.KafkaFactory.CreateConsumerConfig("any");
+
+ var consumer =
+ this.KafkaFactory.CreateConsumer(config);
+
+ Assert.NotNull(consumer);
+
+ consumer.Consume(1000);
+
+ Assert.True(invoked);
+ }
+
+ [Fact]
+ public void CreateOauthConsumerThrow()
+ {
+ var kafkaBuilder = MockCluster.AddMockCluster(this, oauth: true);
+ kafkaBuilder.WithConsumerConfig("any").Configure(x =>
+ {
+ x.ConsumerConfig.GroupId = "any";
+ x.ConsumerConfig.StatisticsIntervalMs = 5;
+ });
+ kafkaBuilder.WithClusterConfig(MockCluster.ClusterName)
+ .Configure(x => x.WithOAuthHandler(_ => throw new ArithmeticException(), true));
+
+ ConsumerConfig config = this.KafkaFactory.CreateConsumerConfig("any");
+
+ Assert.Throws(() => this.KafkaFactory.CreateConsumer(config,
+ configure: b =>
+ b.SetOAuthBearerTokenRefreshHandler(
+ (_, _) => { })));
+ }
+
[Fact]
public void CreateConfigsWithPlaceholders()
{