Skip to content

Commit

Permalink
Logging for schema registry client creation #41
Browse files Browse the repository at this point in the history
  • Loading branch information
IharYakimush authored Dec 2, 2024
1 parent 3265eb7 commit 1997f10
Show file tree
Hide file tree
Showing 25 changed files with 102 additions and 50 deletions.
6 changes: 5 additions & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ dotnet_diagnostic.CA1507.severity = warning
dotnet_diagnostic.CA1805.severity = warning
csharp_style_prefer_primary_constructors = false:suggestion
csharp_style_prefer_readonly_struct_member = true:suggestion
csharp_prefer_system_threading_lock = true:suggestion
[*.{cs,vb}]
#### Naming styles ####

Expand Down Expand Up @@ -143,6 +144,9 @@ dotnet_style_qualification_for_field = true:suggestion
dotnet_style_qualification_for_property = true:suggestion
dotnet_style_qualification_for_method = true:suggestion
dotnet_style_qualification_for_event = true:suggestion
csharp_style_unused_value_expression_statement_preference = discard_variable:none
dotnet_diagnostic.IDE0055.severity = none
dotnet_diagnostic.IDE0057.severity = none
dotnet_diagnostic.CA1051.severity = warning
dotnet_diagnostic.CA1068.severity = warning
dotnet_diagnostic.CA1069.severity = error
Expand Down Expand Up @@ -178,7 +182,7 @@ dotnet_diagnostic.CA3075.severity = warning
dotnet_diagnostic.CA2201.severity = warning
dotnet_diagnostic.CA2219.severity = warning
dotnet_diagnostic.CA2251.severity = suggestion
dotnet_style_prefer_collection_expression = false:warning
dotnet_style_prefer_collection_expression = never:warning
dotnet_diagnostic.CA1510.severity = none
dotnet_diagnostic.CA1511.severity = none
dotnet_diagnostic.CA1512.severity = none
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/dotnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: Setup .NET
uses: actions/setup-dotnet@v4
with:
dotnet-version: 8.0.x
dotnet-version: 9.0.x
- name: Restore
run: dotnet restore
- name: Build
Expand Down Expand Up @@ -85,7 +85,7 @@ jobs:
- name: Setup .NET
uses: actions/setup-dotnet@v4
with:
dotnet-version: 8.0.x
dotnet-version: 9.0.x
source-url: https://api.nuget.org/v3/index.json
env:
NUGET_AUTH_TOKEN: ${{secrets.NUGET_TOKEN}}
Expand Down
8 changes: 4 additions & 4 deletions sample/Epam.Kafka.Sample/Epam.Kafka.Sample.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.7.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.7.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="8.0.11" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.10.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.10.0" />

</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<NeutralLanguage>en</NeutralLanguage>
<PackageReleaseNotes>https://github.com/epam/epam-kafka/releases</PackageReleaseNotes>
<VersionPrefix>0</VersionPrefix>
<Version>2.3.$(VersionPrefix)</Version>
<Version>2.4.$(VersionPrefix)</Version>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static IQueryable<T> AsTracking<T>(this IQueryable<T> queryable)
return queryable;
}

public static int SaveChanges(this DbContext context, bool acceptAllChangesOnSuccess)
public static int SaveChanges(this DbContext context, bool _)
{
return context.SaveChanges();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net6.0'">
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="6.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="6.0.36" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net8.0'">
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="8.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="8.0.11" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,12 @@ protected override void TransactionCommitted(IReadOnlyCollection<TEntity> entiti
}
catch (DbUpdateConcurrencyException exception)
{
#pragma warning disable IDE0008 // Use explicit type not possible due to #if directives
foreach (var entry in exception.Entries)
{
this.Logger.PublicationEntityDetached(exception, "Commit", this.FindPrimaryKeyForLogs(entry), typeof(TEntity));
}
#pragma warning restore IDE0008
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public IReadOnlyCollection<TopicPartitionOffset> CommitOrReset(
{
cancellationToken.ThrowIfCancellationRequested();

#pragma warning disable IDE0008 // Use explicit type not possible due to #if directives
foreach (var entry in exception.Entries)
{
if (entry.Entity is KafkaTopicState)
Expand All @@ -152,6 +153,7 @@ public IReadOnlyCollection<TopicPartitionOffset> CommitOrReset(
}
}
}
#pragma warning restore IDE0008

this._context.SaveChanges(true);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
// Copyright © 2024 EPAM Systems

#pragma warning disable IDE0065
#pragma warning disable IDE0008

#if EF6
namespace Epam.Kafka.PubSub.EntityFramework6.Subscription.State;
#else
Expand Down Expand Up @@ -45,4 +48,7 @@ public static ETBuilder AddKafkaState(this ModelBuilder builder)

return e;
}
}
}

#pragma warning restore IDE0065
#pragma warning restore IDE0008
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ protected PubSubHealthCheck(PubSubOptions options)
default:
throw new InvalidOperationException(
$"Unknown pipeline status {monitor.Pipeline.Value}. Supported values: " +
string.Join(", ", Enum.GetNames(typeof(PipelineStatus))));
string.Join(", ",
#if NET6_0_OR_GREATER
Enum.GetNames<PipelineStatus>()
#else
Enum.GetNames(typeof(PipelineStatus))
#endif
));
}

return Task.FromResult(new HealthCheckResult(
Expand Down
2 changes: 1 addition & 1 deletion src/Epam.Kafka.PubSub/Common/PubSubBackgroundService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Yield();

if (this.Options.Enabled == false)
if (!this.Options.Enabled)
{
this.Monitor.Pipeline.Update(PipelineStatus.Disabled);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public ValidateOptionsResult Validate(string? name, PublicationOptions options)
throw new ArgumentNullException(nameof(options));
}

if (options.Enabled == false)
if (!options.Enabled)
{
return ValidateOptionsResult.Success;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public ValidateOptionsResult Validate(string? name, SubscriptionOptions options)
throw new ArgumentNullException(nameof(options));
}

if (options.Enabled == false)
if (!options.Enabled)
{
return ValidateOptionsResult.Success;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// Copyright © 2024 EPAM Systems

using Confluent.Kafka;

using Epam.Kafka.PubSub.Common.Pipeline;

namespace Epam.Kafka.PubSub.Subscription.Pipeline;
Expand Down
2 changes: 1 addition & 1 deletion src/Epam.Kafka.PubSub/Subscription/State/ExternalState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected override void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TK
// existing assignment, check if offset reset
if (topic.Consumer.Assignment.Contains(item.TopicPartition))
{
if (topic.TryGetOffset(item.TopicPartition, out var tp) && tp != item.Offset)
if (topic.TryGetOffset(item.TopicPartition, out Offset tp) && tp != item.Offset)
{
ExternalStateExtensions.PauseOrReset(topic, item, pause, reset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public static IReadOnlyCollection<TopicPartitionOffset> CommitState<TKey, TValue
}

public static void PauseOrReset<TKey, TValue>(
SubscriptionTopicWrapper<TKey, TValue> topic,
TopicPartitionOffset item,
SubscriptionTopicWrapper<TKey, TValue> topic,
TopicPartitionOffset item,
List<TopicPartition> pause,
List<TopicPartitionOffset> reset)
{
Expand Down Expand Up @@ -112,7 +112,7 @@ public static void CommitOffsetIfNeeded<TKey, TValue>(
}
else if (item.Offset == Offset.End)
{
var w = topic.Consumer.QueryWatermarkOffsets(item.TopicPartition, TimeSpan.FromSeconds(5));
WatermarkOffsets w = topic.Consumer.QueryWatermarkOffsets(item.TopicPartition, TimeSpan.FromSeconds(5));
toCommit.Add(new TopicPartitionOffset(item.TopicPartition, w.High));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected override void ExecuteBatch(
this.Monitor.Batch.Update(BatchStatus.Reading);

bool unassignedBeforeRead = state.GetBatch(
topic, activitySpan, out var batch, cancellationToken);
topic, activitySpan, out IReadOnlyCollection<ConsumeResult<TKey, TValue>> batch, cancellationToken);

cancellationToken.ThrowIfCancellationRequested();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ public SubscriptionTopicWrapper(IKafkaFactory kafkaFactory,

public Func<IReadOnlyCollection<TopicPartition>, IReadOnlyCollection<TopicPartitionOffset>>? ExternalState { get; set; }

public bool TryGetOffset(TopicPartition tp, out Offset result) => this._offsets.TryGetValue(tp, out result);
public bool TryGetOffset(TopicPartition tp, out Offset result)
{
return this._offsets.TryGetValue(tp, out result);
}

public void OnAssign(IReadOnlyCollection<TopicPartitionOffset> items)
{
Expand Down Expand Up @@ -229,7 +232,7 @@ public void OnReset(IReadOnlyCollection<TopicPartitionOffset> items)
List<TopicPartitionOffset> reset = new(items.Count);
List<TopicPartitionOffset> resume = new(items.Count);

foreach (var tpo in items)
foreach (TopicPartitionOffset tpo in items)
{
if (this._paused.Remove(tpo.TopicPartition))
{
Expand Down Expand Up @@ -285,7 +288,7 @@ private bool OnPauseEnumerate(IEnumerable<TopicPartition> items)
throw;
}

foreach (var r in result)
foreach (TopicPartition r in result)
{
this._paused.Add(r);
this._offsets[r] = ExternalOffset.Paused;
Expand Down Expand Up @@ -485,6 +488,7 @@ private void ReadToBuffer(ActivityWrapper span, CancellationToken cancellationTo
{
ConsumeResult<byte[], byte[]> record = consumeException.ConsumerRecord;

#pragma warning disable IDE0010 // Add missing cases
switch (consumeException.Error.Code)
{
case ErrorCode.Local_Fatal:
Expand All @@ -507,6 +511,7 @@ private void ReadToBuffer(ActivityWrapper span, CancellationToken cancellationTo
throw;
}
}
#pragma warning restore IDE0010 // Add missing cases

// unable to return at least something, so only throw
if (this._buffer.Count == 0)
Expand Down
1 change: 1 addition & 0 deletions src/Epam.Kafka.PubSub/Utils/ActivityWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Text;

namespace Epam.Kafka.PubSub.Utils;

#pragma warning disable CA2000 // StartActivity returns same activity object, so it will be disposed.
internal sealed class ActivityWrapper : IDisposable
{
Expand Down
46 changes: 31 additions & 15 deletions src/Epam.Kafka/Internals/KafkaFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,16 @@ public IClient GetOrCreateClient(string? cluster = null)

KafkaClusterOptions clusterOptions = this.GetAndValidateClusterOptions(cluster);

SharedClient? result;

lock (this._syncObj)
if (!this._clients.TryGetValue(clusterOptions, out SharedClient? result))
{
if (!this._clients.TryGetValue(clusterOptions, out result))
lock (this._syncObj)
{
result = new SharedClient(this, cluster);
if (!this._clients.TryGetValue(clusterOptions, out result))
{
result = new SharedClient(this, cluster);

this._clients.Add(clusterOptions, result);
this._clients.Add(clusterOptions, result);
}
}
}

Expand All @@ -241,16 +242,31 @@ public ISchemaRegistryClient GetOrCreateSchemaRegistryClient(string? cluster = n

KafkaClusterOptions clusterOptions = this.GetAndValidateClusterOptions(cluster);

CachedSchemaRegistryClient? result;

lock (this._syncObj)
if (!this._registries.TryGetValue(clusterOptions, out CachedSchemaRegistryClient? result))
{
if (!this._registries.TryGetValue(clusterOptions, out result))
{
result = new CachedSchemaRegistryClient(clusterOptions.SchemaRegistryConfig,
clusterOptions.AuthenticationHeaderValueProvider);
ILogger logger = this._loggerFactory.CreateLogger(LoggerCategoryName);

this._registries.Add(clusterOptions, result);
lock (this._syncObj)
{
if (!this._registries.TryGetValue(clusterOptions, out result))
{
try
{
result = new CachedSchemaRegistryClient(clusterOptions.SchemaRegistryConfig,
clusterOptions.AuthenticationHeaderValueProvider);

this._registries.Add(clusterOptions, result);

logger.RegistryClientCreateOk(PrepareConfigForLogs(clusterOptions.SchemaRegistryConfig),
clusterOptions.AuthenticationHeaderValueProvider?.GetType());
}
catch (Exception exception)
{
logger.RegistryClientCreateError(exception, clusterOptions.SchemaRegistryConfig,
clusterOptions.AuthenticationHeaderValueProvider?.GetType());
throw;
}
}
}
}

Expand All @@ -265,7 +281,7 @@ private static void ValidateLogicalName(string? configName, string entityType)
}
}

private static IEnumerable<KeyValuePair<string, string>> PrepareConfigForLogs(Config config)
private static IEnumerable<KeyValuePair<string, string>> PrepareConfigForLogs(IEnumerable<KeyValuePair<string, string>> config)
{
return config.Select(x => Contains(x, "password") || Contains(x, "secret")
? new KeyValuePair<string, string>(x.Key, "*******")
Expand Down
15 changes: 15 additions & 0 deletions src/Epam.Kafka/LogExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,19 @@ internal static partial void ProducerCreateOk(this ILogger logger, IEnumerable<K
Message = "Producer create error. KeyType: {KeyType}, ValueType: {ValueType}, Config: {Config}.")]
internal static partial void ProducerCreateError(this ILogger logger, Exception exception,
IEnumerable<KeyValuePair<string, string>> config, Type keyType, Type valueType);

[LoggerMessage(
EventId = 203,
EventName = "RegistryClientCreated",
Level = LogLevel.Information,
Message = "Registry client created. HeaderProviderType: {HeaderProviderType}, Config: {Config}.")]
internal static partial void RegistryClientCreateOk(this ILogger logger, IEnumerable<KeyValuePair<string, string>> config, Type? headerProviderType);

[LoggerMessage(
EventId = 503,
EventName = "RegistryClientCreateError",
Level = LogLevel.Error,
Message = "Registry client create error. HeaderProviderType: {HeaderProviderType}, Config: {Config}.")]
internal static partial void RegistryClientCreateError(this ILogger logger, Exception exception,
IEnumerable<KeyValuePair<string, string>> config, Type? headerProviderType);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
// Copyright © 2024 EPAM Systems

#if !NET6_0_OR_GREATER
using Epam.Kafka.Internals;
#endif

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;

Expand Down Expand Up @@ -35,7 +31,7 @@ public void Configure(string? name, TOptions options)
return;
}

IConfigurationSection section = this._configuration.GetSection(this.ParentSectionName).GetSection(name);
IConfigurationSection section = this._configuration.GetSection(this.ParentSectionName).GetSection(name!);

if (section.Exists())
{
Expand Down
Loading

0 comments on commit 1997f10

Please sign in to comment.