Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code cleanup #28

Merged
merged 8 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@ namespace Epam.Kafka.PubSub.EntityFramework6;

internal static class CompatibilityExtensions
{
public static IQueryable<T> AsTracking<T>(this IQueryable<T> queryable) => queryable;
public static IQueryable<T> AsTracking<T>(this IQueryable<T> queryable)
{
return queryable;
}

public static int SaveChanges(this DbContext context, bool acceptAllChangesOnSuccess) => context.SaveChanges();
public static int SaveChanges(this DbContext context, bool acceptAllChangesOnSuccess)
{
return context.SaveChanges();
}

public static void Add(this DbContext context, object entity)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#if EF6
using System.Data.Entity;
using System.Data.Entity.Infrastructure;

#else
using Microsoft.EntityFrameworkCore;
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
#if EF6
using System.Data.Entity;
using System.Data.Entity.Infrastructure;

using TEntry = System.Data.Entity.Infrastructure.DbEntityEntry;

#else
using Microsoft.EntityFrameworkCore;

using TEntry = Microsoft.EntityFrameworkCore.ChangeTracking.EntityEntry;

#endif
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -103,7 +107,7 @@ protected sealed override void Callback(IReadOnlyDictionary<TEntity, IReadOnlyCo
throw;
}

foreach (var entry in exception.Entries)
foreach (TEntry? entry in exception.Entries)
{
entry.State = EntityState.Detached;

Expand All @@ -129,7 +133,7 @@ protected sealed override void Callback(IReadOnlyDictionary<TEntity, IReadOnlyCo
#if EF6
return null; // Unable to get it for EF6
#else
return entry.Metadata.FindPrimaryKey()?.Properties.Select(x => entry.CurrentValues[x]).ToArray().FirstOrDefault();
return entry.Metadata.FindPrimaryKey()?.Properties.Select(x => entry.CurrentValues[x]).ToArray().FirstOrDefault();
#endif
}
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
#if EF6
using Epam.Kafka.PubSub.EntityFramework6.Subscription;
using Epam.Kafka.PubSub.EntityFramework6.Subscription.State;

using System.Data.Entity;

#else
using Epam.Kafka.PubSub.EntityFrameworkCore.Subscription;
using Epam.Kafka.PubSub.EntityFrameworkCore.Subscription.State;

using Microsoft.EntityFrameworkCore;

#endif
using Epam.Kafka.PubSub.Subscription;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#if EF6
using System.Data.Entity;

#else
using Microsoft.EntityFrameworkCore;
#endif
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
// Copyright © 2024 EPAM Systems

using Confluent.Kafka;

using Epam.Kafka.PubSub.Subscription;

#if EF6
using Epam.Kafka.PubSub.EntityFramework6.Subscription.State;

using System.Data.Entity;
using System.Data.Entity.Infrastructure;
#else
using Epam.Kafka.PubSub.EntityFrameworkCore.Subscription.State;

using Microsoft.EntityFrameworkCore;
#endif

Expand Down Expand Up @@ -139,7 +142,7 @@ public IReadOnlyCollection<TopicPartitionOffset> CommitOrReset(
#else
proposedValues.Properties
#endif
)
)
{
proposedValues[property] = databaseValues[property];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#if EF6
using System.Data.Entity;

#else
using Microsoft.EntityFrameworkCore;
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public static ETBuilder AddKafkaState(this ModelBuilder builder)
{
throw new ArgumentNullException(nameof(builder));
}
var e = builder.Entity<KafkaTopicState>();

ETBuilder e = builder.Entity<KafkaTopicState>();

e.HasKey(x => new { x.Topic, x.Partition, x.ConsumerGroup });

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ protected PubSubHealthCheck(PubSubOptions options)
protected PubSubOptions Options { get; }
protected virtual DateTime UtcNow => DateTime.UtcNow;

public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = new ())
public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = new())
{
PipelineMonitor monitor = this.GetPipelineMonitor();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Copyright © 2024 EPAM Systems


using System.Linq.Expressions;
using System.Text.RegularExpressions;

Expand Down
2 changes: 1 addition & 1 deletion src/Epam.Kafka.PubSub/Common/PubSubBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal PubSubBuilder(KafkaBuilder builder, string name, Type keyType, Type val
{
this.Builder = builder ?? throw new ArgumentNullException(nameof(builder));
this.Key = name ?? throw new ArgumentNullException(nameof(name));

this._options = builder.Services.AddOptions<TOptions>(this.Key)
.Configure(x =>
{
Expand Down
3 changes: 2 additions & 1 deletion src/Epam.Kafka.PubSub/KafkaBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
using Epam.Kafka.PubSub.Common.HealthChecks;
using Epam.Kafka.PubSub.Publication;
using Epam.Kafka.PubSub.Publication.Options;
using Epam.Kafka.PubSub.Publication.Pipeline;
using Epam.Kafka.PubSub.Subscription;
using Epam.Kafka.PubSub.Subscription.Options;
using Epam.Kafka.PubSub.Subscription.Pipeline;
using Epam.Kafka.PubSub.Publication.Pipeline;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Diagnostics.HealthChecks;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Copyright © 2024 EPAM Systems

using System.Collections.Concurrent;
using Confluent.Kafka;

using Epam.Kafka.PubSub.Common;
using Epam.Kafka.PubSub.Common.Pipeline;

using System.Collections.Concurrent;

namespace Epam.Kafka.PubSub.Publication.Pipeline;

/// <summary>
Expand Down Expand Up @@ -48,7 +49,7 @@ internal bool TryRegisterTransactionId(ProducerConfig config, out string? existi

if (!result)
{
existingName = ids[id]?.Name;
existingName = ids[id].Name;
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ protected override void ExecuteBatch(
}
catch (Exception e2)
{
AggregateException exception = new AggregateException(e1, e2);
var exception = new AggregateException(e1, e2);
exception.DoNotRetryBatch();

throw exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public PublicationTopicWrapper(
config.SocketTimeoutMs = config.TransactionTimeoutMs - 1000;
}

if (!monitor.TryRegisterTransactionId(config, out var existing))
if (!monitor.TryRegisterTransactionId(config, out string? existing))
{
InvalidOperationException exception = new InvalidOperationException(
var exception = new InvalidOperationException(
$"Unable to use '{config.TransactionalId}' transactional.id in '{monitor.Name}' publication because it already used by '{existing}'.");
exception.DoNotRetryBatch();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright © 2024 EPAM Systems

using Confluent.Kafka;

using Epam.Kafka.PubSub.Common.Pipeline;

namespace Epam.Kafka.PubSub.Subscription.Pipeline;
Expand Down
3 changes: 2 additions & 1 deletion src/Epam.Kafka.PubSub/Subscription/State/BatchState.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright © 2024 EPAM Systems

using Confluent.Kafka;

using Epam.Kafka.PubSub.Subscription.Topics;
using Epam.Kafka.PubSub.Utils;

Expand All @@ -25,7 +26,7 @@ public IReadOnlyCollection<ConsumeResult<TKey, TValue>> GetBatch<TKey, TValue>(

cancellationToken.ThrowIfCancellationRequested();

var batch = topic.GetBatch(activitySpan, cancellationToken);
IReadOnlyCollection<ConsumeResult<TKey, TValue>> batch = topic.GetBatch(activitySpan, cancellationToken);

// try to throw handler assign exception after read to be able to do it after potential re-balance in same batch.
topic.ThrowIfNeeded();
Expand Down
3 changes: 2 additions & 1 deletion src/Epam.Kafka.PubSub/Subscription/State/CombinedState.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright © 2024 EPAM Systems

using Confluent.Kafka;

using Epam.Kafka.PubSub.Subscription.Topics;
using Epam.Kafka.PubSub.Utils;

Expand Down Expand Up @@ -50,7 +51,7 @@ protected override void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TK
topic.OnReset(reset);
}
}

protected override IReadOnlyCollection<TopicPartitionOffset> CommitState<TKey, TValue>(
SubscriptionTopicWrapper<TKey, TValue> topic,
IReadOnlyCollection<TopicPartitionOffset> offsets,
Expand Down
3 changes: 2 additions & 1 deletion src/Epam.Kafka.PubSub/Subscription/State/ExternalState.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright © 2024 EPAM Systems

using Confluent.Kafka;

using Epam.Kafka.PubSub.Subscription.Options;
using Epam.Kafka.PubSub.Subscription.Topics;
using Epam.Kafka.PubSub.Utils;
Expand Down Expand Up @@ -30,7 +31,7 @@ protected override void AssignConsumer<TKey, TValue>(SubscriptionTopicWrapper<TK

var reset = new List<TopicPartitionOffset>();
var assign = new List<TopicPartitionOffset>();

foreach (TopicPartitionOffset item in state)
{
// existing assignment, check if offset reset
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright © 2024 EPAM Systems

using Confluent.Kafka;

using Epam.Kafka.PubSub.Subscription.Topics;
using Epam.Kafka.PubSub.Utils;

Expand Down Expand Up @@ -37,7 +38,9 @@ public static IReadOnlyCollection<TopicPartitionOffset> CommitState<TKey, TValue

// compare actual and expected value to understand if offset was committed or reset
if (expected.Offset == item.Offset)
{
committed.Add(expected);
}
else
{
TopicPartitionOffset tpo = new(item.TopicPartition, item.Offset);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright © 2024 EPAM Systems

using Confluent.Kafka;

using Epam.Kafka.PubSub.Subscription.Options;
using Epam.Kafka.PubSub.Subscription.Topics;
using Epam.Kafka.PubSub.Utils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public SubscriptionBuilder<TKey, TValue, THandler> WithKeyDeserializer(

return this;
}

/// <summary>
/// Set factory for creation of value deserializer for consumer.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ protected virtual void ProcessBatch(IDictionary<ConsumeResult<TKey, TValue>, str
throw new ArgumentNullException(nameof(items));
}

var filtered = items.Where(x => x.Value == null);
IEnumerable<KeyValuePair<ConsumeResult<TKey, TValue>, string?>> filtered = items.Where(x => x.Value == null);

#if !NET6_0_OR_GREATER
filtered = filtered.ToArray(); // to prevent collection modification
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public IReadOnlyCollection<TopicPartitionOffset> GetAndResetState(

if (state.Any(x => x.Offset == Offset.Unset))
{
state = this.AutoResetOffsets(state, out var toCommit);
state = this.AutoResetOffsets(state, out List<TopicPartitionOffset>? toCommit);

if (toCommit.Count > 0)
{
Expand All @@ -118,13 +118,13 @@ private List<TopicPartitionOffset> AutoResetOffsets(IReadOnlyCollection<TopicPar
toReset = new List<TopicPartitionOffset>(offsets.Count);
List<TopicPartitionOffset> result = new(offsets.Count);

foreach (var tpo in offsets)
foreach (TopicPartitionOffset tpo in offsets)
{
if (tpo.Offset == Offset.Unset)
{
TopicPartition topicPartition = tpo.TopicPartition;

var q = this.Consumer.QueryWatermarkOffsets(topicPartition, TimeSpan.FromSeconds(5));
WatermarkOffsets q = this.Consumer.QueryWatermarkOffsets(topicPartition, TimeSpan.FromSeconds(5));

switch (this._autoOffsetReset)
{
Expand Down Expand Up @@ -271,9 +271,9 @@ private void OnPartitionsAssigned(IConsumer<TKey, TValue> c, List<TopicPartition
#pragma warning disable CA1031 // can't throw exceptions in handler callback because it triggers incorrect state in librdkafka and some times leads to app crash.
try
{
var state = this.ExternalState.Invoke(tp);
IReadOnlyCollection<TopicPartitionOffset> state = this.ExternalState.Invoke(tp);

foreach (var tpo in state)
foreach (TopicPartitionOffset tpo in state)
{
this.Offsets[tpo.TopicPartition] = tpo.Offset;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Epam.Kafka.PubSub/Utils/ActivityWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void SetResult(object? value)

this._activity.SetTag(Result, description);
}

this._activity.SetStatus(ActivityStatusCode.Ok, description);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/Epam.Kafka/ISharedClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Epam.Kafka;
/// <summary>
/// Extend <see cref="IClient"/> by adding <see cref="IObservable{T}"/> for <see cref="Error"/> and <see cref="Statistics"/>.
/// </summary>
public interface ISharedClient: IObservable<Error>, IObservable<Statistics>, IClient
public interface ISharedClient : IObservable<Error>, IObservable<Statistics>, IClient
{

}
5 changes: 4 additions & 1 deletion src/Epam.Kafka/Internals/CompatibilityExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,19 @@ public static string Replace(this string str,
// Same as original .NET C# string.Replace behavior.
throw new ArgumentNullException(nameof(str));
}

if (oldValue == null)
{
// Same as original .NET C# string.Replace behavior.
throw new ArgumentNullException(nameof(oldValue));
}

if (oldValue.Length == 0)
{
// Same as original .NET C# string.Replace behavior.
throw new ArgumentException("String cannot be of zero length.");
}

if (str.Length == 0)
{
// Same as original .NET C# string.Replace behavior.
Expand All @@ -49,7 +52,7 @@ public static string Replace(this string str,

// Prepare string builder for storing the processed string.
// Note: StringBuilder has a better performance than String by 30-40%.
StringBuilder resultStringBuilder = new StringBuilder(str.Length);
var resultStringBuilder = new StringBuilder(str.Length);

// Analyze the replacement: replace or remove.
bool isReplacementNullOrEmpty = string.IsNullOrEmpty(newValue);
Expand Down
Loading