Skip to content

Commit

Permalink
improvements and fixes:
Browse files Browse the repository at this point in the history
System.InvalidOperationException: Operations that change non-concurrent collections must have exclusive access #40
Logging for schema registry client creation #41
Optionally throw exception if OAuth handler already set and callback arguments nullability. #44

consumer and producer warmup in pub/sub background services
  • Loading branch information
IharYakimush committed Dec 2, 2024
1 parent 08aa885 commit a585236
Show file tree
Hide file tree
Showing 34 changed files with 291 additions and 99 deletions.
6 changes: 5 additions & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ dotnet_diagnostic.CA1507.severity = warning
dotnet_diagnostic.CA1805.severity = warning
csharp_style_prefer_primary_constructors = false:suggestion
csharp_style_prefer_readonly_struct_member = true:suggestion
csharp_prefer_system_threading_lock = true:suggestion
[*.{cs,vb}]
#### Naming styles ####

Expand Down Expand Up @@ -143,6 +144,9 @@ dotnet_style_qualification_for_field = true:suggestion
dotnet_style_qualification_for_property = true:suggestion
dotnet_style_qualification_for_method = true:suggestion
dotnet_style_qualification_for_event = true:suggestion
csharp_style_unused_value_expression_statement_preference = discard_variable:none
dotnet_diagnostic.IDE0055.severity = none
dotnet_diagnostic.IDE0057.severity = none
dotnet_diagnostic.CA1051.severity = warning
dotnet_diagnostic.CA1068.severity = warning
dotnet_diagnostic.CA1069.severity = error
Expand Down Expand Up @@ -178,7 +182,7 @@ dotnet_diagnostic.CA3075.severity = warning
dotnet_diagnostic.CA2201.severity = warning
dotnet_diagnostic.CA2219.severity = warning
dotnet_diagnostic.CA2251.severity = suggestion
dotnet_style_prefer_collection_expression = false:warning
dotnet_style_prefer_collection_expression = never:warning
dotnet_diagnostic.CA1510.severity = none
dotnet_diagnostic.CA1511.severity = none
dotnet_diagnostic.CA1512.severity = none
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/dotnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: Setup .NET
uses: actions/setup-dotnet@v4
with:
dotnet-version: 8.0.x
dotnet-version: 9.0.x
- name: Restore
run: dotnet restore
- name: Build
Expand Down Expand Up @@ -85,7 +85,7 @@ jobs:
- name: Setup .NET
uses: actions/setup-dotnet@v4
with:
dotnet-version: 8.0.x
dotnet-version: 9.0.x
source-url: https://api.nuget.org/v3/index.json
env:
NUGET_AUTH_TOKEN: ${{secrets.NUGET_TOKEN}}
Expand Down
6 changes: 5 additions & 1 deletion sample/Epam.Kafka.Sample.Net462/App.config
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="System.Memory" publicKeyToken="cc7b13ffcd2ddd51" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-4.0.1.1" newVersion="4.0.1.1" />
<bindingRedirect oldVersion="0.0.0.0-4.0.1.2" newVersion="4.0.1.2" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="System.Text.Json" publicKeyToken="cc7b13ffcd2ddd51" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-8.0.0.5" newVersion="8.0.0.5" />
</dependentAssembly>
</assemblyBinding>
</runtime>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@
<HintPath>packages\System.Text.Encodings.Web.8.0.0\lib\net462\System.Text.Encodings.Web.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System.Text.Json, Version=8.0.0.4, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51">
<HintPath>packages\System.Text.Json.8.0.4\lib\net462\System.Text.Json.dll</HintPath>
<Reference Include="System.Text.Json, Version=8.0.0.5, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51">
<HintPath>packages\System.Text.Json.8.0.5\lib\net462\System.Text.Json.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System.Threading.Tasks.Extensions, Version=4.2.0.1, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51, processorArchitecture=MSIL">
Expand Down
2 changes: 1 addition & 1 deletion sample/Epam.Kafka.Sample.Net462/packages.config
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<package id="System.Security.Cryptography.Primitives" version="4.3.0" targetFramework="net462" />
<package id="System.Security.Cryptography.X509Certificates" version="4.3.0" targetFramework="net462" />
<package id="System.Text.Encodings.Web" version="8.0.0" targetFramework="net462" />
<package id="System.Text.Json" version="8.0.4" targetFramework="net462" />
<package id="System.Text.Json" version="8.0.5" targetFramework="net462" />
<package id="System.Threading.Tasks.Extensions" version="4.5.4" targetFramework="net462" />
<package id="System.ValueTuple" version="4.5.0" targetFramework="net462" />
</packages>
8 changes: 4 additions & 4 deletions sample/Epam.Kafka.Sample/Epam.Kafka.Sample.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.7.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.7.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="8.0.11" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.10.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.10.0" />

</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<NeutralLanguage>en</NeutralLanguage>
<PackageReleaseNotes>https://github.com/epam/epam-kafka/releases</PackageReleaseNotes>
<VersionPrefix>0</VersionPrefix>
<Version>2.3.$(VersionPrefix)</Version>
<Version>2.4.$(VersionPrefix)</Version>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static IQueryable<T> AsTracking<T>(this IQueryable<T> queryable)
return queryable;
}

public static int SaveChanges(this DbContext context, bool acceptAllChangesOnSuccess)
public static int SaveChanges(this DbContext context, bool _)
{
return context.SaveChanges();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net6.0'">
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="6.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="6.0.36" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net8.0'">
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="8.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="8.0.11" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,12 @@ protected override void TransactionCommitted(IReadOnlyCollection<TEntity> entiti
}
catch (DbUpdateConcurrencyException exception)
{
#pragma warning disable IDE0008 // Use explicit type not possible due to #if directives
foreach (var entry in exception.Entries)
{
this.Logger.PublicationEntityDetached(exception, "Commit", this.FindPrimaryKeyForLogs(entry), typeof(TEntity));
}
#pragma warning restore IDE0008
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public IReadOnlyCollection<TopicPartitionOffset> CommitOrReset(
{
cancellationToken.ThrowIfCancellationRequested();

#pragma warning disable IDE0008 // Use explicit type not possible due to #if directives
foreach (var entry in exception.Entries)
{
if (entry.Entity is KafkaTopicState)
Expand All @@ -152,6 +153,7 @@ public IReadOnlyCollection<TopicPartitionOffset> CommitOrReset(
}
}
}
#pragma warning restore IDE0008

this._context.SaveChanges(true);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
// Copyright © 2024 EPAM Systems

#pragma warning disable IDE0065
#pragma warning disable IDE0008

#if EF6
namespace Epam.Kafka.PubSub.EntityFramework6.Subscription.State;
#else
Expand Down Expand Up @@ -45,4 +48,7 @@ public static ETBuilder AddKafkaState(this ModelBuilder builder)

return e;
}
}
}

#pragma warning restore IDE0065
#pragma warning restore IDE0008
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ protected PubSubHealthCheck(PubSubOptions options)
default:
throw new InvalidOperationException(
$"Unknown pipeline status {monitor.Pipeline.Value}. Supported values: " +
string.Join(", ", Enum.GetNames(typeof(PipelineStatus))));
string.Join(", ",
#if NET6_0_OR_GREATER
Enum.GetNames<PipelineStatus>()
#else
Enum.GetNames(typeof(PipelineStatus))
#endif
));
}

return Task.FromResult(new HealthCheckResult(
Expand Down
2 changes: 1 addition & 1 deletion src/Epam.Kafka.PubSub/Common/PubSubBackgroundService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Yield();

if (this.Options.Enabled == false)
if (!this.Options.Enabled)
{
this.Monitor.Pipeline.Update(PipelineStatus.Disabled);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public ValidateOptionsResult Validate(string? name, PublicationOptions options)
throw new ArgumentNullException(nameof(options));
}

if (options.Enabled == false)
if (!options.Enabled)
{
return ValidateOptionsResult.Success;
}
Expand Down
14 changes: 10 additions & 4 deletions src/Epam.Kafka.PubSub/Publication/PublicationBackgroundService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,22 @@ protected override IPublicationTopicWrapper<TKey, TValue> CreateTopicWrapper()

bool implicitPreprocessor = ks != null || vs != null || config.TransactionalId != null;

IPublicationTopicWrapper<TKey, TValue> result;

if (this.Options.SerializationPreprocessor ?? implicitPreprocessor)
{
return new PublicationSerializeKeyAndValueTopicWrapper<TKey, TValue>(this.KafkaFactory, this.Monitor,
result = new PublicationSerializeKeyAndValueTopicWrapper<TKey, TValue>(this.KafkaFactory, this.Monitor,
config, this.Options, this.Logger,
ks, vs, this.Options.Partitioner);
}
else
{
result = new PublicationTopicWrapper<TKey, TValue>(this.KafkaFactory, this.Monitor, config, this.Options,
this.Logger,
ks, vs, this.Options.Partitioner);
}

return new PublicationTopicWrapper<TKey, TValue>(this.KafkaFactory, this.Monitor, config, this.Options,
this.Logger,
ks, vs, this.Options.Partitioner);
return result;
}

protected override TimeSpan? GetBatchFinishedTimeout(PublicationBatchResult subBatchResult)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public PublicationTopicWrapper(
b.SetValueSerializer(valueSerializer);
}
});

// warmup to avoid potential issues with OAuth handler
this.Producer.Poll(TimeSpan.Zero);
}

private static void ConfigureReports(ProducerConfig config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public ValidateOptionsResult Validate(string? name, SubscriptionOptions options)
throw new ArgumentNullException(nameof(options));
}

if (options.Enabled == false)
if (!options.Enabled)
{
return ValidateOptionsResult.Success;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// Copyright © 2024 EPAM Systems

using Confluent.Kafka;

using Epam.Kafka.PubSub.Common.Pipeline;

namespace Epam.Kafka.PubSub.Subscription.Pipeline;
Expand Down
2 changes: 1 addition & 1 deletion src/Epam.Kafka.PubSub/Subscription/State/ExternalState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected override void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TK
// existing assignment, check if offset reset
if (topic.Consumer.Assignment.Contains(item.TopicPartition))
{
if (topic.TryGetOffset(item.TopicPartition, out var tp) && tp != item.Offset)
if (topic.TryGetOffset(item.TopicPartition, out Offset tp) && tp != item.Offset)
{
ExternalStateExtensions.PauseOrReset(topic, item, pause, reset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public static IReadOnlyCollection<TopicPartitionOffset> CommitState<TKey, TValue
}

public static void PauseOrReset<TKey, TValue>(
SubscriptionTopicWrapper<TKey, TValue> topic,
TopicPartitionOffset item,
SubscriptionTopicWrapper<TKey, TValue> topic,
TopicPartitionOffset item,
List<TopicPartition> pause,
List<TopicPartitionOffset> reset)
{
Expand Down Expand Up @@ -112,7 +112,7 @@ public static void CommitOffsetIfNeeded<TKey, TValue>(
}
else if (item.Offset == Offset.End)
{
var w = topic.Consumer.QueryWatermarkOffsets(item.TopicPartition, TimeSpan.FromSeconds(5));
WatermarkOffsets w = topic.Consumer.QueryWatermarkOffsets(item.TopicPartition, TimeSpan.FromSeconds(5));
toCommit.Add(new TopicPartitionOffset(item.TopicPartition, w.High));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected override void ExecuteBatch(
this.Monitor.Batch.Update(BatchStatus.Reading);

bool unassignedBeforeRead = state.GetBatch(
topic, activitySpan, out var batch, cancellationToken);
topic, activitySpan, out IReadOnlyCollection<ConsumeResult<TKey, TValue>> batch, cancellationToken);

cancellationToken.ThrowIfCancellationRequested();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public SubscriptionTopicWrapper(IKafkaFactory kafkaFactory,

this.ConfigureConsumerBuilder(b);
});

// Warmup consumer to avoid potential issues with OAuth handler.
// Consumer just created, so not assigned to any partition.
this.Consumer.Consume(100);
}

public SubscriptionMonitor Monitor { get; }
Expand All @@ -79,7 +83,10 @@ public SubscriptionTopicWrapper(IKafkaFactory kafkaFactory,

public Func<IReadOnlyCollection<TopicPartition>, IReadOnlyCollection<TopicPartitionOffset>>? ExternalState { get; set; }

public bool TryGetOffset(TopicPartition tp, out Offset result) => this._offsets.TryGetValue(tp, out result);
public bool TryGetOffset(TopicPartition tp, out Offset result)
{
return this._offsets.TryGetValue(tp, out result);
}

public void OnAssign(IReadOnlyCollection<TopicPartitionOffset> items)
{
Expand Down Expand Up @@ -229,7 +236,7 @@ public void OnReset(IReadOnlyCollection<TopicPartitionOffset> items)
List<TopicPartitionOffset> reset = new(items.Count);
List<TopicPartitionOffset> resume = new(items.Count);

foreach (var tpo in items)
foreach (TopicPartitionOffset tpo in items)
{
if (this._paused.Remove(tpo.TopicPartition))
{
Expand Down Expand Up @@ -285,7 +292,7 @@ private bool OnPauseEnumerate(IEnumerable<TopicPartition> items)
throw;
}

foreach (var r in result)
foreach (TopicPartition r in result)
{
this._paused.Add(r);
this._offsets[r] = ExternalOffset.Paused;
Expand Down Expand Up @@ -485,6 +492,7 @@ private void ReadToBuffer(ActivityWrapper span, CancellationToken cancellationTo
{
ConsumeResult<byte[], byte[]> record = consumeException.ConsumerRecord;

#pragma warning disable IDE0010 // Add missing cases
switch (consumeException.Error.Code)
{
case ErrorCode.Local_Fatal:
Expand All @@ -507,6 +515,7 @@ private void ReadToBuffer(ActivityWrapper span, CancellationToken cancellationTo
throw;
}
}
#pragma warning restore IDE0010 // Add missing cases

// unable to return at least something, so only throw
if (this._buffer.Count == 0)
Expand Down
1 change: 1 addition & 0 deletions src/Epam.Kafka.PubSub/Utils/ActivityWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Text;

namespace Epam.Kafka.PubSub.Utils;

#pragma warning disable CA2000 // StartActivity returns same activity object, so it will be disposed.
internal sealed class ActivityWrapper : IDisposable
{
Expand Down
Loading

0 comments on commit a585236

Please sign in to comment.