diff --git a/tests/Epam.Kafka.PubSub.Tests/Publication/PubServiceErrorTests.cs b/tests/Epam.Kafka.PubSub.Tests/Publication/PubServiceErrorTests.cs deleted file mode 100644 index fde6b43..0000000 --- a/tests/Epam.Kafka.PubSub.Tests/Publication/PubServiceErrorTests.cs +++ /dev/null @@ -1,275 +0,0 @@ -// Copyright © 2024 EPAM Systems - -using Castle.Components.DictionaryAdapter; - -using Confluent.Kafka; - -using Epam.Kafka.PubSub.Publication; -using Epam.Kafka.PubSub.Publication.Pipeline; -using Epam.Kafka.PubSub.Tests.Helpers; -using Epam.Kafka.Tests.Common; - -using Microsoft.Extensions.DependencyInjection; - -using Xunit; -using Xunit.Abstractions; - -namespace Epam.Kafka.PubSub.Tests.Publication; - -public class PubServiceErrorTests : TestWithServices, IClassFixture -{ - private readonly MockCluster _mockCluster; - - public PubServiceErrorTests(ITestOutputHelper output, MockCluster mockCluster) : base(output) - { - this._mockCluster = mockCluster ?? throw new ArgumentNullException(nameof(mockCluster)); - } - - [Fact] - public async Task HandlerCtorError() - { - using TestObserver observer = new(this, 2); - - TestSerializer serializer = new(observer); - - MockCluster.AddMockCluster(this) - .AddPublication(observer.Name, - ServiceLifetime.Scoped) - .WithValueSerializer(_ => serializer) - .WithOptions(options => - { - options.DefaultTopic = this.AnyTopicName; - options.PipelineRetryTimeout = TimeSpan.Zero; - }); - - await this.RunBackgroundServices(); - - // pipeline 1 - observer.AssertStart(); - observer.AssertStop(HandlerWithExceptionInConstructor.Exc); - - // pipeline 2 - observer.AssertStart(); - observer.AssertStop(HandlerWithExceptionInConstructor.Exc); - } - - [Fact] - public async Task GetBatchError() - { - TestException exc1 = new("Test1"); - TestException exc2 = new("Test2"); - TestException exc3 = new("Test3"); - - using TestObserver observer = new(this, 3); - - const int batchSize = 100; - - TestPublicationHandler handler = new TestPublicationHandler(false, observer) - .WithBatch(1, 100, exc1) - .WithBatch(2, 100, exc2) - .WithBatch(3, 100, exc3); - - TestSerializer serializer = new(observer); - - this.Services.AddScoped(_ => handler); - - MockCluster.AddMockCluster(this) - .AddPublication(observer.Name, ServiceLifetime.Scoped) - .WithValueSerializer(_ => serializer) - .WithOptions(options => - { - options.DefaultTopic = this.AnyTopicName; - options.BatchSize = batchSize; - }); - - await this.RunBackgroundServices(); - - handler.Verify(); - - // iteration 1 - - observer.AssertStart(); - observer.AssertNextActivity("src_read.Start"); - observer.AssertNextActivity("src_read.Stop"); - observer.AssertStop(exc1); - - // iteration 2 - observer.AssertStart(); - observer.AssertNextActivity("src_read.Start"); - observer.AssertNextActivity("src_read.Stop"); - observer.AssertStop(exc2); - - // iteration 3 - observer.AssertStart(); - observer.AssertNextActivity("src_read.Start"); - observer.AssertNextActivity("src_read.Stop"); - observer.AssertStop(exc3); - } - - [Theory] - [InlineData(0, 1, true)] - [InlineData(0, 1, false)] - [InlineData(1, 0, true)] - [InlineData(1, 0, false)] - public async Task SerializerErrorPartialPreprocessing(int successIndex, int errorIndex, bool transaction) - { - TestException[] exc = { new("S0"), new("S1") }; - - TestEntityKafka[] entity = { new(), new() }; - - TopicMessage[] message = entity.Select(x => x.ToMessage()).ToArray(); - - KeyValuePair report = message[errorIndex].ToReport(Offset.Unset, this.AnyTopicName, - Partition.Any, - ErrorCode.Local_ValueSerialization, PersistenceStatus.NotPersisted); - - using TestObserver observer = new(this, 1); - - const int batchSize = 100; - - TestPublicationHandler handler = new TestPublicationHandler(transaction, observer) - .WithBatch(1, batchSize, message).WithReport(1, report); - - TestSerializer serializer = new(observer); - serializer.WithSuccess(1, entity[successIndex]).WithError(1, exc[errorIndex], entity[errorIndex]); - - this.Services.AddScoped(_ => handler); - - MockCluster.AddMockCluster(this) - .AddPublication(observer.Name, ServiceLifetime.Scoped) - .WithValueSerializer(_ => serializer) - .WithOptions(options => - { - options.DefaultTopic = this.AnyTopicName; - options.BatchSize = batchSize; - if (transaction) - { - options.Producer = MockCluster.TransactionalProducer; - } - }).WithPartitioner(partitioner => partitioner.Default = (_, _, _, _) => 0); - - await this.RunBackgroundServices(); - - handler.Verify(); - - // iteration 1 - - observer.AssertStart(); - observer.AssertNextActivity("src_read.Start"); - observer.AssertNextActivity("src_read.Stop", 2); - - observer.AssertNextActivity("serialize.Start"); - observer.AssertNextActivity("serialize.Stop"); - - observer.AssertNextActivity("src_report.Start"); - observer.AssertNextActivity("src_report.Stop"); - observer.AssertStop(PublicationBatchResult.ProcessedPartial); - } - - [Theory] - [InlineData(0, 1, true)] - [InlineData(0, 1, false)] - [InlineData(1, 0, true)] - [InlineData(1, 0, false)] - public async Task SerializerErrorPartial(int successIndex, int errorIndex, bool transaction) - { - TestException[] exc = { new("S0"), new("S1") }; - - TestEntityKafka[] entity = { new(), new() }; - - TopicMessage[] message = entity.Select(x => x.ToMessage()).ToArray(); - - List> report = new EditableList> - { - message[errorIndex].ToReport(Offset.Unset, this.AnyTopicName, Partition.Any, - ErrorCode.Local_ValueSerialization, PersistenceStatus.NotPersisted) - }; - - if (successIndex == 0) - { - report.Add(message[successIndex].ToReport(0, this.AnyTopicName)); - } - - using TestObserver observer = new(this, 1); - - const int batchSize = 100; - - TestPublicationHandler handler = new TestPublicationHandler(transaction, observer) - .WithBatch(1, batchSize, message).WithReport(1, report.ToArray()); - - TestSerializer serializer = new(observer); - serializer.WithSuccess(1, entity[successIndex]).WithError(1, exc[errorIndex], entity[errorIndex]); - - this.Services.AddScoped(_ => handler); - - this._mockCluster.LaunchMockCluster(this) - .AddPublication(observer.Name, ServiceLifetime.Scoped) - .WithValueSerializer(_ => serializer) - .WithOptions(options => - { - options.SerializationPreprocessor = false; - options.DefaultTopic = this.AnyTopicName; - options.BatchSize = batchSize; - if (transaction) - { - options.Producer = MockCluster.TransactionalProducer; - } - }).WithPartitioner(partitioner => partitioner.Default = (_, _, _, _) => 0); - - await this.RunBackgroundServices(); - - handler.Verify(); - - // iteration 1 - - observer.AssertStart(); - observer.AssertNextActivity("src_read.Start"); - observer.AssertNextActivity("src_read.Stop", 2); - if (transaction) - { - observer.AssertNextActivity("init_transactions.Start"); - observer.AssertNextActivity("init_transactions.Stop"); - observer.AssertNextActivity("begin_transaction.Start"); - observer.AssertNextActivity("begin_transaction.Stop"); - } - - observer.AssertNextActivity("produce.Start"); - observer.AssertNextActivity("produce.Stop"); - observer.AssertNextActivity("src_report.Start"); - observer.AssertNextActivity("src_report.Stop"); - if (transaction) - { - observer.AssertNextActivity("abort_transaction.Start"); - observer.AssertNextActivity("abort_transaction.Stop"); - } - - observer.AssertStop(PublicationBatchResult.ProcessedPartial); - } - - private class HandlerWithExceptionInConstructor : IPublicationHandler - { - public HandlerWithExceptionInConstructor() - { - throw Exc; - } - - public static InvalidTimeZoneException Exc { get; } = new("test ctor"); - - public void TransactionCommitted(CancellationToken cancellationToken) - { - throw new NotImplementedException(); - } - - public IReadOnlyCollection> GetBatch(int count, bool transaction, - CancellationToken cancellationToken) - { - throw new NotImplementedException(); - } - - public void ReportResults(IDictionary, DeliveryReport> reports, - DateTimeOffset? transactionEnd, CancellationToken cancellationToken) - { - throw new NotImplementedException(); - } - } -} \ No newline at end of file diff --git a/tests/Epam.Kafka.PubSub.Tests/Publication/PubServiceSuccessTests.cs b/tests/Epam.Kafka.PubSub.Tests/Publication/PubServiceSuccessTests.cs index c1ebaf6..627ca87 100644 --- a/tests/Epam.Kafka.PubSub.Tests/Publication/PubServiceSuccessTests.cs +++ b/tests/Epam.Kafka.PubSub.Tests/Publication/PubServiceSuccessTests.cs @@ -70,86 +70,4 @@ public async Task Publish() // iteration 2 (empty) observer.AssertPubEmpty(); } - - [Fact] - public async Task PublishTransaction() - { - TestEntityKafka entity1 = new(); - PubSub.Publication.TopicMessage message1 = entity1.ToMessage(); - KeyValuePair report1 = message1.ToReport(0, this.AnyTopicName); - - TestEntityKafka entity2 = new(); - PubSub.Publication.TopicMessage message2 = entity2.ToMessage(); - KeyValuePair report2 = message2.ToReport(1, this.AnyTopicName); - - using TestObserver observer = new(this, 3); - - const int batchSize = 100; - - TestPublicationHandler handler = new TestPublicationHandler(true, observer) - .WithBatch(1, batchSize, message1).WithReport(1, report1).WithTransaction(1) - .WithBatch(2, batchSize, message2).WithReport(2, report2).WithTransaction(2) - .WithBatch(3, batchSize); - - TestSerializer serializer = new TestSerializer(observer).WithSuccess(1, entity1).WithSuccess(2, entity2); - - this.Services.AddScoped(_ => handler); - - this._mockCluster.LaunchMockCluster(this) - .AddPublication(observer.Name, ServiceLifetime.Scoped) - .WithValueSerializer(_ => serializer) - .WithOptions(options => - { - options.DefaultTopic = this.AnyTopicName; - options.BatchSize = batchSize; - options.Producer = MockCluster.TransactionalProducer; - }).WithPartitioner(partitioner => partitioner.Default = (_, _, _, _) => 0); - - await this.RunBackgroundServices(); - - serializer.Verify(); - handler.Verify(); - - // iteration 1 (one item to publish, create producer and init transactions) - - observer.AssertStart(); - observer.AssertNextActivity("src_read.Start"); - observer.AssertNextActivity("src_read.Stop", 1); - observer.AssertNextActivity("serialize.Start"); - observer.AssertNextActivity("serialize.Stop"); - observer.AssertNextActivity("init_transactions.Start"); - observer.AssertNextActivity("init_transactions.Stop"); - observer.AssertNextActivity("begin_transaction.Start"); - observer.AssertNextActivity("begin_transaction.Stop"); - observer.AssertNextActivity("produce.Start"); - observer.AssertNextActivity("produce.Stop"); - observer.AssertNextActivity("src_report.Start"); - observer.AssertNextActivity("src_report.Stop"); - observer.AssertNextActivity("commit_transaction.Start"); - observer.AssertNextActivity("commit_transaction.Stop"); - observer.AssertNextActivity("src_commit.Start"); - observer.AssertNextActivity("src_commit.Stop"); - observer.AssertStop(PublicationBatchResult.Processed); - - // iteration 2 (one item to publish) - observer.AssertStart(); - observer.AssertNextActivity("src_read.Start"); - observer.AssertNextActivity("src_read.Stop", 1); - observer.AssertNextActivity("serialize.Start"); - observer.AssertNextActivity("serialize.Stop"); - observer.AssertNextActivity("begin_transaction.Start"); - observer.AssertNextActivity("begin_transaction.Stop"); - observer.AssertNextActivity("produce.Start"); - observer.AssertNextActivity("produce.Stop"); - observer.AssertNextActivity("src_report.Start"); - observer.AssertNextActivity("src_report.Stop"); - observer.AssertNextActivity("commit_transaction.Start"); - observer.AssertNextActivity("commit_transaction.Stop"); - observer.AssertNextActivity("src_commit.Start"); - observer.AssertNextActivity("src_commit.Stop"); - observer.AssertStop(PublicationBatchResult.Processed); - - // iteration 3 (empty) - observer.AssertPubEmpty(); - } } \ No newline at end of file diff --git a/tests/Epam.Kafka.PubSub.Tests/Subscription/CombinedState/HandlerErrorTests.cs b/tests/Epam.Kafka.PubSub.Tests/Subscription/CombinedState/HandlerErrorTests.cs deleted file mode 100644 index 5ad3607..0000000 --- a/tests/Epam.Kafka.PubSub.Tests/Subscription/CombinedState/HandlerErrorTests.cs +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright © 2024 EPAM Systems - -using Confluent.Kafka; -using Epam.Kafka.PubSub.Subscription.Pipeline; -using Epam.Kafka.PubSub.Tests.Helpers; -using Epam.Kafka.Tests.Common; - -using Microsoft.Extensions.DependencyInjection; - -using Xunit; -using Xunit.Abstractions; - -namespace Epam.Kafka.PubSub.Tests.Subscription.CombinedState; - -[Collection(SubscribeTests.Name)] -public class HandlerErrorTests : TestWithServices -{ - private readonly MockCluster _mockCluster; - - public HandlerErrorTests(ITestOutputHelper output, MockCluster mockCluster) : base(output) - { - this._mockCluster = mockCluster ?? throw new ArgumentNullException(nameof(mockCluster)); - } - - [Fact] - public async Task TransientErrorWithAdaptiveBatchSize() - { - Exception exception = new TestException(); - - TopicPartition tp3 = new(this.AnyTopicName, 3); - - using TestObserver observer = new(this, 8); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer, 0, 1, 2); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithSubscribeAndExternalOffsets().WithOptions(x => - { - x.BatchSize = 6; - }); - - Dictionary m1 = await MockCluster.SeedKafka(this, 6, tp3); - Dictionary m2 = await MockCluster.SeedKafka(this, 4, tp3); - - handler.WithError(2, exception, m1); - handler.WithError(3, exception, m1); - handler.WithError(4, exception, m1.Take(3)); - handler.WithError(5, exception, m1.Take(1)); - handler.WithSuccess(6, m1.Take(1)); - handler.WithSuccess(7, m1.Skip(1)); - handler.WithSuccess(8, m2); - - deserializer.WithSuccess(2, m1.Keys.ToArray()); - deserializer.WithSuccess(8, m2.Keys.ToArray()); - - var unset = new TopicPartitionOffset(tp3, Offset.Unset); - var offset1 = new TopicPartitionOffset(tp3, 1); - var offset6 = new TopicPartitionOffset(tp3, 6); - var offset10 = new TopicPartitionOffset(tp3, 10); - - offsets.WithGet(2, unset); - offsets.WithGet(3, unset); - offsets.WithGet(4, unset); - offsets.WithGet(5, unset); - offsets.WithGet(6, unset); - offsets.WithSetAndGetForNextIteration(6, offset1); - offsets.WithSetAndGetForNextIteration(7, offset6); - offsets.WithSet(8, offset10); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - offsets.Verify(); - - // iteration 1 - observer.AssertSubNotAssigned(); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(6); - observer.AssertProcess(); - observer.AssertStop(exception); - - // iteration 3 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertStop(exception); - - // iteration 4 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertStop(exception); - - // iteration 5 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertStop(exception); - - // iteration 6 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 7 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 8 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(4); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - } -} \ No newline at end of file diff --git a/tests/Epam.Kafka.PubSub.Tests/Subscription/CombinedState/ReadTests.cs b/tests/Epam.Kafka.PubSub.Tests/Subscription/CombinedState/ReadTests.cs deleted file mode 100644 index af6caff..0000000 --- a/tests/Epam.Kafka.PubSub.Tests/Subscription/CombinedState/ReadTests.cs +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright © 2024 EPAM Systems - -using Confluent.Kafka; - -using Epam.Kafka.PubSub.Subscription.Pipeline; -using Epam.Kafka.PubSub.Tests.Helpers; -using Epam.Kafka.Tests.Common; - -using Microsoft.Extensions.DependencyInjection; - -using Xunit; -using Xunit.Abstractions; - -namespace Epam.Kafka.PubSub.Tests.Subscription.CombinedState; - -public class ReadTests : TestWithServices, IClassFixture -{ - private readonly MockCluster _mockCluster; - - public ReadTests(ITestOutputHelper output, MockCluster mockCluster) : base(output) - { - this._mockCluster = mockCluster ?? throw new ArgumentNullException(nameof(mockCluster)); - } - - [Fact] - public async Task OneBatchTwoPartitions() - { - TopicPartition tp1 = new(this.AnyTopicName, 1); - TopicPartition tp2 = new(this.AnyTopicName, 2); - - using TestObserver observer = new(this, 3); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer, 0, 3); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithSubscribeAndExternalOffsets(); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp1); - Dictionary m2 = await MockCluster.SeedKafka(this, 5, tp2); - - handler.WithSuccess(2, m1.Concat(m2)); - deserializer.WithSuccess(2, m1.Keys.Concat(m2.Keys).ToArray()); - - offsets.WithGet(2, new TopicPartitionOffset(tp1, Offset.Unset), new TopicPartitionOffset(tp2, Offset.Unset)); - offsets.WithSetAndGetForNextIteration(2, new TopicPartitionOffset(tp1, 5), new TopicPartitionOffset(tp2, 5)); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - offsets.Verify(); - - // iteration 1 - observer.AssertSubNotAssigned(); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(10); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 3 - observer.AssertSubEmpty(); - } - - [Fact] - public async Task OnePartitionTwoBatches() - { - TopicPartition tp3 = new(this.AnyTopicName, 3); - - using TestObserver observer = new(this, 4); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer, 0, 1, 2); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithSubscribeAndExternalOffsets().WithOptions(x => x.BatchSize = 5); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp3); - Dictionary m2 = await MockCluster.SeedKafka(this, 5, tp3); - - handler.WithSuccess(2, m1); - handler.WithSuccess(3, m2); - deserializer.WithSuccess(2, m1.Keys.ToArray()); - deserializer.WithSuccess(3, m2.Keys.ToArray()); - - var unset = new TopicPartitionOffset(tp3, Offset.Unset); - var offset5 = new TopicPartitionOffset(tp3, 5); - var offset10 = new TopicPartitionOffset(tp3, 10); - - offsets.WithGet(2, unset); - - offsets.WithSetAndGetForNextIteration(2, offset5); - offsets.WithSetAndGetForNextIteration(3, offset10); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - offsets.Verify(); - - // iteration 1 - observer.AssertSubNotAssigned(); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(5); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 3 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(5); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 4 - observer.AssertSubEmpty(); - } -} \ No newline at end of file diff --git a/tests/Epam.Kafka.PubSub.Tests/Subscription/CombinedState/ResetTests.cs b/tests/Epam.Kafka.PubSub.Tests/Subscription/CombinedState/ResetTests.cs deleted file mode 100644 index c1d2c73..0000000 --- a/tests/Epam.Kafka.PubSub.Tests/Subscription/CombinedState/ResetTests.cs +++ /dev/null @@ -1,213 +0,0 @@ -// Copyright © 2024 EPAM Systems - -using Confluent.Kafka; -using Epam.Kafka.PubSub.Subscription.Pipeline; -using Epam.Kafka.PubSub.Tests.Helpers; -using Epam.Kafka.Tests.Common; - -using Microsoft.Extensions.DependencyInjection; - -using Xunit; -using Xunit.Abstractions; - -namespace Epam.Kafka.PubSub.Tests.Subscription.CombinedState; - -public class ResetTests : TestWithServices, IClassFixture -{ - private readonly MockCluster _mockCluster; - - public ResetTests(ITestOutputHelper output, MockCluster mockCluster) : base(output) - { - this._mockCluster = mockCluster ?? throw new ArgumentNullException(nameof(mockCluster)); - } - - [Fact] - public async Task OneRunningOneStarting() - { - TopicPartition tp1 = new(this.AnyTopicName, 1); - TopicPartition tp2 = new(this.AnyTopicName, 2); - - using TestObserver observer = new(this, 4); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer, 0, 3); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithSubscribeAndExternalOffsets(); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp1); - Dictionary m2 = await MockCluster.SeedKafka(this, 5, tp2); - - handler.WithSuccess(2, m2); - handler.WithSuccess(3, m1); - deserializer.WithSuccess(2, m2.Keys.ToArray()); - deserializer.WithSuccess(3, m1.Keys.ToArray()); - - var p1OffsetEnd = new TopicPartitionOffset(tp1, Offset.End); - - var p1Offset0 = new TopicPartitionOffset(tp1, 0); - var p2Offset0 = new TopicPartitionOffset(tp2, 0); - - var p1Offset5 = new TopicPartitionOffset(tp1, 5); - var p2Offset5 = new TopicPartitionOffset(tp2, 5); - - offsets.WithGet(2, p1OffsetEnd, p2Offset0); - offsets.WithSet(2, p2Offset5); - - offsets.WithGet(3, p1Offset0, p2Offset5); - offsets.WithSet(3, p1Offset5); - - offsets.WithGet(4, p1Offset5, p2Offset5); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - offsets.Verify(); - - // iteration 1 - observer.AssertSubNotAssigned(); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(5); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 3 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(5); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 4 - observer.AssertSubEmpty(); - } - - [Fact] - public async Task AllPausedStartOne() - { - TopicPartition tp3 = new(this.AnyTopicName, 3); - - using TestObserver observer = new(this, 3); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer, 0, 1, 2); - - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithSubscribeAndExternalOffsets(); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp3); - - handler.WithSuccess(3, m1); - deserializer.WithSuccess(3, m1.Keys.ToArray()); - - var end = new TopicPartitionOffset(tp3, Offset.End); - var offset0 = new TopicPartitionOffset(tp3, 0); - var offset5 = new TopicPartitionOffset(tp3, 5); - - offsets.WithGet(2, end); - - offsets.WithGet(3, offset0); - offsets.WithSet(3, offset5); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - offsets.Verify(); - - // iteration 1 - observer.AssertSubNotAssigned(); - - // iteration 2 - observer.AssertSubPaused(); - - // iteration 3 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(5); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - } - - [Theory] - [InlineData(false)] - [InlineData(true)] - public async Task AllPausedStopOne(bool onCommit) - { - TopicPartition tp3 = new(this.AnyTopicName, 3); - - using TestObserver observer = new(this, 3); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer, 0, 1, 2); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithSubscribeAndExternalOffsets().WithOptions(x => x.BatchSize = 5); - - var m1 = (await MockCluster.SeedKafka(this, 10, tp3)).Take(5).ToDictionary(p => p.Key, p => p.Value); - - handler.WithSuccess(2, m1); - deserializer.WithSuccess(2, m1.Keys.ToArray()); - - var offset0 = new TopicPartitionOffset(tp3, 0); - var offset5 = new TopicPartitionOffset(tp3, 5); - var offset10 = new TopicPartitionOffset(tp3, 10); - - offsets.WithGet(2, offset0); - - if (onCommit) - { - offsets.WithReset(2, offset5, offset10); - } - else - { - offsets.WithSet(2, offset5); - } - - offsets.WithGet(3, offset10); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - offsets.Verify(); - - // iteration 1 - observer.AssertSubNotAssigned(); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(5); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 3 - observer.AssertSubEmpty(); - } -} \ No newline at end of file diff --git a/tests/Epam.Kafka.PubSub.Tests/Subscription/CombinedState/SerializationErrorTests.cs b/tests/Epam.Kafka.PubSub.Tests/Subscription/CombinedState/SerializationErrorTests.cs deleted file mode 100644 index 815d668..0000000 --- a/tests/Epam.Kafka.PubSub.Tests/Subscription/CombinedState/SerializationErrorTests.cs +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright © 2024 EPAM Systems - -using Confluent.Kafka; - -using Epam.Kafka.PubSub.Subscription.Pipeline; -using Epam.Kafka.PubSub.Tests.Helpers; -using Epam.Kafka.Tests.Common; - -using Microsoft.Extensions.DependencyInjection; - -using Xunit; -using Xunit.Abstractions; - -namespace Epam.Kafka.PubSub.Tests.Subscription.CombinedState; - -[Collection(SubscribeTests.Name)] -public class SerializationErrorTests : TestWithServices -{ - private readonly MockCluster _mockCluster; - - public SerializationErrorTests(ITestOutputHelper output, MockCluster mockCluster) : base(output) - { - this._mockCluster = mockCluster ?? throw new ArgumentNullException(nameof(mockCluster)); - } - - [Fact] - public async Task SinglePartitionAtBeginning() - { - TopicPartition tp3 = new(this.AnyTopicName, 3); - - TestException exc = new(); - using TestObserver observer = new(this, 5); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer, 0, 1, 2); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithSubscribeAndExternalOffsets().WithValueDeserializer(_ => deserializer) - .WithOptions(x => - { - x.BatchNotAssignedTimeout = TimeSpan.FromSeconds(10); - x.BatchRetryCount = 1; - }); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp3); - - deserializer.WithError(2, exc, m1.Keys.First()); - deserializer.WithError(3, exc, m1.Keys.First()); - deserializer.WithError(5, exc, m1.Keys.First()); - - TopicPartitionOffset unset = new (tp3, Offset.Unset); - offsets.WithGet(2, unset); - offsets.WithGet(3, unset); - offsets.WithGet(5, unset); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - - // iteration 1 - observer.AssertSubNotAssigned(); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(); - observer.AssertStop("Value deserialization error"); - - // iteration 3 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(); - observer.AssertStop("Value deserialization error"); - - // iteration 4 partition not assigned until 6 sec session timeout elapsed - observer.AssertSubNotAssigned(); - - // iteration 5 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(); - observer.AssertStop("Value deserialization error"); - } - - [Fact] - public async Task SinglePartitionInTheMiddleOfBatch() - { - TopicPartition tp3 = new(this.AnyTopicName, 3); - - TestException exc = new(); - using TestObserver observer = new(this, 3); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer, 0, 1, 2); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithSubscribeAndExternalOffsets().WithValueDeserializer(_ => deserializer) - .WithOptions(x => x.BatchNotAssignedTimeout = TimeSpan.FromSeconds(10)); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp3); - - deserializer.WithSuccess(2, m1.Keys.ElementAt(0)); - deserializer.WithError(2, exc, m1.Keys.ElementAt(1)); - deserializer.WithError(3, exc, m1.Keys.ElementAt(1)); - - handler.WithSuccess(2, m1.Take(1)); - - TopicPartitionOffset unset = new (tp3, Offset.Unset); - TopicPartitionOffset offset1 = new (tp3, 1); - - offsets.WithGet(2, unset); - offsets.WithSetAndGetForNextIteration(2, offset1); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - - // iteration 1 - observer.AssertSubNotAssigned(); - - // iteration 2 process deserialized items before error - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(1); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 3 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(); - observer.AssertStop("Value deserialization error"); - } -} \ No newline at end of file diff --git a/tests/Epam.Kafka.PubSub.Tests/Subscription/CombinedState/StateErrorTests.cs b/tests/Epam.Kafka.PubSub.Tests/Subscription/CombinedState/StateErrorTests.cs deleted file mode 100644 index 85c1cf2..0000000 --- a/tests/Epam.Kafka.PubSub.Tests/Subscription/CombinedState/StateErrorTests.cs +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright © 2024 EPAM Systems - -using Confluent.Kafka; - -using Epam.Kafka.PubSub.Tests.Helpers; -using Epam.Kafka.Tests.Common; - -using Microsoft.Extensions.DependencyInjection; - -using Xunit; -using Xunit.Abstractions; - -namespace Epam.Kafka.PubSub.Tests.Subscription.CombinedState; - -[Collection(SubscribeTests.Name)] -public class StateErrorTests : TestWithServices -{ - private readonly MockCluster _mockCluster; - - public StateErrorTests(ITestOutputHelper output, MockCluster mockCluster) : base(output) - { - this._mockCluster = mockCluster ?? throw new ArgumentNullException(nameof(mockCluster)); - } - - [Fact] - public async Task ErrorOnSet() - { - TopicPartition tp3 = new(this.AnyTopicName, 3); - TestException exception = new (); - - using TestObserver observer = new(this, 3); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer, 0, 1, 2); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithSubscribeAndExternalOffsets(); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp3); - - handler.WithSuccess(2, m1); - handler.WithSuccess(3, m1); - deserializer.WithSuccess(2, m1.Keys.ToArray()); - - var unset = new TopicPartitionOffset(tp3, Offset.Unset); - var offset5 = new TopicPartitionOffset(tp3, 5); - - offsets.WithGet(2, unset); - offsets.WithSetError(2, exception, offset5); - offsets.WithGet(3, unset); - offsets.WithSetError(3, exception, offset5); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - offsets.Verify(); - - // iteration 1 - observer.AssertSubNotAssigned(); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(5); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertStop(exception); - - // iteration 3 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertStop(exception); - } - - [Fact] - public async Task ErrorOnGet() - { - TestException exception = new(); - - using TestObserver observer = new(this, 4); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithSubscribeAndExternalOffsets(); - - offsets.WithGetError(2, exception); - offsets.WithGetError(4, exception); - - await MockCluster.SeedKafka(this, 1, new TopicPartition(this.AnyTopicName, 0)); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - offsets.Verify(); - - // iteration 1 - observer.AssertSubNotAssigned(); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(0); - observer.AssertStop(exception); - - // iteration 3 - observer.AssertSubNotAssigned(); - - // iteration 4 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(0); - observer.AssertStop(exception); - } -} \ No newline at end of file diff --git a/tests/Epam.Kafka.PubSub.Tests/Subscription/ExternalState/HandlerErrorTests.cs b/tests/Epam.Kafka.PubSub.Tests/Subscription/ExternalState/HandlerErrorTests.cs deleted file mode 100644 index 0dc8463..0000000 --- a/tests/Epam.Kafka.PubSub.Tests/Subscription/ExternalState/HandlerErrorTests.cs +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright © 2024 EPAM Systems - -using Confluent.Kafka; -using Epam.Kafka.PubSub.Subscription.Options; -using Epam.Kafka.PubSub.Subscription.Pipeline; -using Epam.Kafka.PubSub.Tests.Helpers; -using Epam.Kafka.Tests.Common; -using Microsoft.Extensions.DependencyInjection; -using Shouldly; - -using Xunit; -using Xunit.Abstractions; - -namespace Epam.Kafka.PubSub.Tests.Subscription.ExternalState; - -public class HandlerErrorTests : TestWithServices, IClassFixture -{ - private readonly MockCluster _mockCluster; - - public HandlerErrorTests(ITestOutputHelper output, MockCluster mockCluster) : base(output) - { - this._mockCluster = mockCluster ?? throw new ArgumentNullException(nameof(mockCluster)); - } - - [Fact] - public async Task TransientErrorWithAdaptiveBatchSize() - { - Exception exception = new TestException(); - - TopicPartition tp3 = new(this.AnyTopicName, 3); - - using TestObserver observer = new(this, 7); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithAssignAndExternalOffsets().WithOptions(x => - { - x.BatchSize = 6; - x.WithTopicPartitions(tp3); - }); - - Dictionary m1 = await MockCluster.SeedKafka(this, 6, tp3); - Dictionary m2 = await MockCluster.SeedKafka(this, 4, tp3); - - handler.WithError(1, exception, m1); - handler.WithError(2, exception, m1); - handler.WithError(3, exception, m1.Take(3)); - handler.WithError(4, exception, m1.Take(1)); - handler.WithSuccess(5, m1.Take(1)); - handler.WithSuccess(6, m1.Skip(1)); - handler.WithSuccess(7, m2); - - deserializer.WithSuccess(1, m1.Keys.ToArray()); - deserializer.WithSuccess(7, m2.Keys.ToArray()); - - var unset = new TopicPartitionOffset(tp3, Offset.Unset); - var offset1 = new TopicPartitionOffset(tp3, 1); - var offset6 = new TopicPartitionOffset(tp3, 6); - var offset10 = new TopicPartitionOffset(tp3, 10); - - offsets.WithGet(1, unset); - offsets.WithGet(2, unset); - offsets.WithGet(3, unset); - offsets.WithGet(4, unset); - offsets.WithGet(5, unset); - offsets.WithSetAndGetForNextIteration(5, offset1); - offsets.WithSetAndGetForNextIteration(6, offset6); - offsets.WithSet(7, offset10); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - offsets.Verify(); - - // iteration 1 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(6); - observer.AssertProcess(); - observer.AssertStop(exception); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertStop(exception); - - // iteration 3 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertStop(exception); - - // iteration 4 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertStop(exception); - - // iteration 5 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 6 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 7 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(4); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - } - - [Fact] - public async Task UnableToResolveHandler() - { - TopicPartition tp1 = new(this.AnyTopicName, 1); - - using TestObserver observer = new(this, 3); - - var offsets = new TestOffsetsStorage(observer); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster) - .WithValueDeserializer(_ => deserializer) - .WithAssignAndExternalOffsets() - .WithOptions(options => { options.WithTopicPartitions(tp1); }); - - Dictionary entities = await MockCluster.SeedKafka(this, 5, tp1); - deserializer.WithSuccess(1, entities.Keys.ToArray()); - offsets.WithGet(1, new TopicPartitionOffset(tp1, 0)); - - InvalidOperationException exc = await Assert.ThrowsAsync(this.RunBackgroundServices); - - exc.Message.ShouldContain( - "Unable to resolve service for type 'Epam.Kafka.PubSub.Tests.Helpers.TestObserver' while attempting"); - - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(); - observer.AssertStop(exc); - } - - [Fact] - public async Task UnableToResolveState() - { - TopicPartition tp1 = new(this.AnyTopicName, 1); - - this.Services.AddScoped(); - - using TestObserver observer = new(this, 3); - - var deserializer = new TestDeserializer(observer); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithAssignAndExternalOffsets() - .WithOptions(options => { options.WithTopicPartitions(tp1); }); - - Dictionary entities = await MockCluster.SeedKafka(this, 5, tp1); - deserializer.WithSuccess(2, entities.Keys.ToArray()); - - InvalidOperationException exc = await Assert.ThrowsAsync(this.RunBackgroundServices); - - exc.Message.ShouldContain( - "No constructor for type 'Epam.Kafka.PubSub.Tests.Helpers.TestOffsetsStorage' can be instantiated"); - - observer.AssertStart(); - observer.AssertStop(exc); - } -} \ No newline at end of file diff --git a/tests/Epam.Kafka.PubSub.Tests/Subscription/ExternalState/ReadTests.cs b/tests/Epam.Kafka.PubSub.Tests/Subscription/ExternalState/ReadTests.cs deleted file mode 100644 index e001663..0000000 --- a/tests/Epam.Kafka.PubSub.Tests/Subscription/ExternalState/ReadTests.cs +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright © 2024 EPAM Systems - -using Confluent.Kafka; - -using Epam.Kafka.PubSub.Subscription.Options; -using Epam.Kafka.PubSub.Subscription.Pipeline; -using Epam.Kafka.PubSub.Tests.Helpers; -using Epam.Kafka.Tests.Common; - -using Microsoft.Extensions.DependencyInjection; - -using Xunit; -using Xunit.Abstractions; - -namespace Epam.Kafka.PubSub.Tests.Subscription.ExternalState; - -public class ReadTests : TestWithServices, IClassFixture -{ - private readonly MockCluster _mockCluster; - - public ReadTests(ITestOutputHelper output, MockCluster mockCluster) : base(output) - { - this._mockCluster = mockCluster ?? throw new ArgumentNullException(nameof(mockCluster)); - } - - [Fact] - public async Task OneBatchTwoPartitions() - { - TopicPartition tp1 = new(this.AnyTopicName, 1); - TopicPartition tp2 = new(this.AnyTopicName, 2); - - using TestObserver observer = new(this, 2); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithAssignAndExternalOffsets().WithOptions(x => x.WithTopicPartitions(tp1, tp2)); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp1); - Dictionary m2 = await MockCluster.SeedKafka(this, 5, tp2); - - handler.WithSuccess(1, m1.Concat(m2)); - deserializer.WithSuccess(1, m1.Keys.ToArray()); - deserializer.WithSuccess(1, m2.Keys.ToArray()); - - var p1Unset = new TopicPartitionOffset(tp1, Offset.Unset); - var p2Unset = new TopicPartitionOffset(tp2, Offset.Unset); - var p1Offset5 = new TopicPartitionOffset(tp1, 5); - var p2Offset5 = new TopicPartitionOffset(tp2, 5); - - offsets.WithGet(1, p1Unset, p2Unset); - offsets.WithSetAndGetForNextIteration(1, p1Offset5, p2Offset5); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - offsets.Verify(); - - // iteration 1 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(10); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 2 - observer.AssertSubEmpty(); - } - - [Fact] - public async Task OnePartitionTwoBatches() - { - TopicPartition tp1 = new(this.AnyTopicName, 1); - - using TestObserver observer = new(this, 3); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithAssignAndExternalOffsets().WithOptions(x => - { - x.WithTopicPartitions(new TopicPartition(this.AnyTopicName, 1)); - x.BatchSize = 5; - }); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp1); - Dictionary m2 = await MockCluster.SeedKafka(this, 5, tp1); - - handler.WithSuccess(1, m1); - handler.WithSuccess(2, m2); - - deserializer.WithSuccess(1, m1.Keys.ToArray()); - deserializer.WithSuccess(2, m2.Keys.ToArray()); - - offsets.WithGet(1, new TopicPartitionOffset(tp1, Offset.Unset)); - offsets.WithSetAndGetForNextIteration(1, new TopicPartitionOffset(tp1, 5)); - offsets.WithSetAndGetForNextIteration(2, new TopicPartitionOffset(tp1, 10)); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - offsets.Verify(); - - // iteration 1 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(5); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(5); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 3 - observer.AssertSubEmpty(); - } -} \ No newline at end of file diff --git a/tests/Epam.Kafka.PubSub.Tests/Subscription/ExternalState/ResetTests.cs b/tests/Epam.Kafka.PubSub.Tests/Subscription/ExternalState/ResetTests.cs deleted file mode 100644 index 466fa97..0000000 --- a/tests/Epam.Kafka.PubSub.Tests/Subscription/ExternalState/ResetTests.cs +++ /dev/null @@ -1,207 +0,0 @@ -// Copyright © 2024 EPAM Systems - -using Confluent.Kafka; -using Epam.Kafka.PubSub.Subscription.Options; -using Epam.Kafka.PubSub.Subscription.Pipeline; -using Epam.Kafka.PubSub.Tests.Helpers; -using Epam.Kafka.Tests.Common; -using Microsoft.Extensions.DependencyInjection; -using Xunit; -using Xunit.Abstractions; - -namespace Epam.Kafka.PubSub.Tests.Subscription.ExternalState; - -public class ResetTests : TestWithServices, IClassFixture -{ - private readonly MockCluster _mockCluster; - - public ResetTests(ITestOutputHelper output, MockCluster mockCluster) : base(output) - { - this._mockCluster = mockCluster ?? throw new ArgumentNullException(nameof(mockCluster)); - } - - [Fact] - public async Task OneRunningOneStarting() - { - TopicPartition tp1 = new(this.AnyTopicName, 1); - TopicPartition tp2 = new(this.AnyTopicName, 2); - - using TestObserver observer = new(this, 3); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithAssignAndExternalOffsets().WithOptions(x => x.WithTopicPartitions(tp1, tp2)); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp1); - Dictionary m2 = await MockCluster.SeedKafka(this, 5, tp2); - - handler.WithSuccess(1, m2); - handler.WithSuccess(2, m1); - deserializer.WithSuccess(1, m2.Keys.ToArray()); - deserializer.WithSuccess(2, m1.Keys.ToArray()); - - var p1End = new TopicPartitionOffset(tp1, Offset.End); - - var p1Offset0 = new TopicPartitionOffset(tp1, 0); - var p2Offset0 = new TopicPartitionOffset(tp2, 0); - - var p1Offset5 = new TopicPartitionOffset(tp1, 5); - var p2Offset5 = new TopicPartitionOffset(tp2, 5); - - offsets.WithGet(1, p1End, p2Offset0); - offsets.WithSet(1, p2Offset5); - - offsets.WithGet(2, p1Offset0, p2Offset5); - offsets.WithSet(2, p1Offset5); - - offsets.WithGet(3, p1Offset5, p2Offset5); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - offsets.Verify(); - - // iteration 1 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(5); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(5); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 3 - observer.AssertSubEmpty(); - } - - [Fact] - public async Task AllPausedStartOne() - { - TopicPartition tp3 = new(this.AnyTopicName, 3); - - using TestObserver observer = new(this, 2); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer); - - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithAssignAndExternalOffsets().WithOptions(x=>x.WithTopicPartitions(tp3)); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp3); - - handler.WithSuccess(2, m1); - deserializer.WithSuccess(2, m1.Keys.ToArray()); - - var end = new TopicPartitionOffset(tp3, Offset.End); - var offset0 = new TopicPartitionOffset(tp3, 0); - var offset5 = new TopicPartitionOffset(tp3, 5); - - offsets.WithGet(1, end); - - offsets.WithGet(2, offset0); - offsets.WithSet(2, offset5); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - offsets.Verify(); - - // iteration 1 - observer.AssertSubPaused(); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(5); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - } - - [Theory] - [InlineData(false)] - [InlineData(true)] - public async Task AllPausedStopOne(bool onCommit) - { - TopicPartition tp3 = new(this.AnyTopicName, 3); - - using TestObserver observer = new(this, 2); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithAssignAndExternalOffsets().WithOptions(x => - { - x.WithTopicPartitions(tp3); - x.BatchSize = 5; - }); - - var m1 = (await MockCluster.SeedKafka(this, 10, tp3)).Take(5).ToDictionary(p => p.Key, p => p.Value); - - handler.WithSuccess(1, m1); - deserializer.WithSuccess(1, m1.Keys.ToArray()); - - var offset0 = new TopicPartitionOffset(tp3, 0); - var offset5 = new TopicPartitionOffset(tp3, 5); - var offset10 = new TopicPartitionOffset(tp3, 10); - - offsets.WithGet(1, offset0); - - if (onCommit) - { - offsets.WithReset(1, offset5, offset10); - } - else - { - offsets.WithSet(1, offset5); - } - - offsets.WithGet(2, offset10); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - offsets.Verify(); - - // iteration 1 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(5); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 2 - observer.AssertSubEmpty(); - } -} \ No newline at end of file diff --git a/tests/Epam.Kafka.PubSub.Tests/Subscription/ExternalState/SerializationErrorTests.cs b/tests/Epam.Kafka.PubSub.Tests/Subscription/ExternalState/SerializationErrorTests.cs deleted file mode 100644 index e43b28d..0000000 --- a/tests/Epam.Kafka.PubSub.Tests/Subscription/ExternalState/SerializationErrorTests.cs +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright © 2024 EPAM Systems - -using Confluent.Kafka; -using Epam.Kafka.PubSub.Subscription.Options; -using Epam.Kafka.PubSub.Subscription.Pipeline; -using Epam.Kafka.PubSub.Tests.Helpers; -using Epam.Kafka.Tests.Common; -using Microsoft.Extensions.DependencyInjection; -using Xunit; -using Xunit.Abstractions; - -namespace Epam.Kafka.PubSub.Tests.Subscription.ExternalState; - -public class SerializationErrorTests : TestWithServices, IClassFixture -{ - private readonly MockCluster _mockCluster; - - public SerializationErrorTests(ITestOutputHelper output, MockCluster mockCluster) : base(output) - { - this._mockCluster = mockCluster ?? throw new ArgumentNullException(nameof(mockCluster)); - } - - [Fact] - public async Task SinglePartitionAtBeginning() - { - TopicPartition tp3 = new(this.AnyTopicName, 2); - - TestException exc = new(); - using TestObserver observer = new(this, 3); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithAssignAndExternalOffsets().WithValueDeserializer(_ => deserializer) - .WithOptions(x => x.WithTopicPartitions(tp3)); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp3); - - deserializer.WithError(1, exc, m1.Keys.First()); - deserializer.WithError(2, exc, m1.Keys.First()); - - TopicPartitionOffset unset = new (tp3, Offset.Unset); - offsets.WithGet(1, unset); - offsets.WithGet(2, unset); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - - // iteration 1 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(); - observer.AssertStop("Value deserialization error"); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(); - observer.AssertStop("Value deserialization error"); - } - - [Fact] - public async Task SinglePartitionInTheMiddleOfBatch() - { - TopicPartition tp3 = new(this.AnyTopicName, 3); - - TestException exc = new(); - using TestObserver observer = new(this, 2); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithAssignAndExternalOffsets().WithValueDeserializer(_ => deserializer) - .WithOptions(x => x.WithTopicPartitions(tp3)); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp3); - - deserializer.WithSuccess(1, m1.Keys.ElementAt(0)); - deserializer.WithError(1, exc, m1.Keys.ElementAt(1)); - deserializer.WithError(2, exc, m1.Keys.ElementAt(1)); - - handler.WithSuccess(1, m1.Take(1)); - - TopicPartitionOffset unset = new (tp3, Offset.Unset); - TopicPartitionOffset offset1 = new (tp3, 1); - - offsets.WithGet(1, unset); - offsets.WithSetAndGetForNextIteration(1, offset1); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - - // iteration 1 process deserialized items before error - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(1); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(); - observer.AssertStop("Value deserialization error"); - } -} \ No newline at end of file diff --git a/tests/Epam.Kafka.PubSub.Tests/Subscription/ExternalState/StateErrorTests.cs b/tests/Epam.Kafka.PubSub.Tests/Subscription/ExternalState/StateErrorTests.cs deleted file mode 100644 index 2babe1f..0000000 --- a/tests/Epam.Kafka.PubSub.Tests/Subscription/ExternalState/StateErrorTests.cs +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright © 2024 EPAM Systems - -using Confluent.Kafka; - -using Epam.Kafka.PubSub.Subscription.Options; -using Epam.Kafka.PubSub.Tests.Helpers; -using Epam.Kafka.Tests.Common; - -using Microsoft.Extensions.DependencyInjection; - -using Xunit; -using Xunit.Abstractions; - -namespace Epam.Kafka.PubSub.Tests.Subscription.ExternalState; - -public class StateErrorTests : TestWithServices, IClassFixture -{ - private readonly MockCluster _mockCluster; - - public StateErrorTests(ITestOutputHelper output, MockCluster mockCluster) : base(output) - { - this._mockCluster = mockCluster ?? throw new ArgumentNullException(nameof(mockCluster)); - } - - [Fact] - public async Task ErrorOnGet() - { - TopicPartition tp3 = new(this.AnyTopicName, 3); - - TestException exception = new(); - - using TestObserver observer = new(this, 2); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithAssignAndExternalOffsets().WithOptions(x => x.WithTopicPartitions(tp3)); - - offsets.WithGetError(1, exception); - offsets.WithGetError(2, exception); - - await MockCluster.SeedKafka(this, 1, new TopicPartition(this.AnyTopicName, 0)); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - offsets.Verify(); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertStop(exception); - - // iteration 4 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertStop(exception); - } - - [Fact] - public async Task ErrorOnSet() - { - TopicPartition tp3 = new(this.AnyTopicName, 3); - TestException exception = new (); - - using TestObserver observer = new(this, 2); - - var handler = new TestSubscriptionHandler(observer); - var offsets = new TestOffsetsStorage(observer); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - this.Services.AddScoped(_ => offsets); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithAssignAndExternalOffsets().WithOptions(x => x.WithTopicPartitions(tp3)); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp3); - - handler.WithSuccess(1, m1); - handler.WithSuccess(2, m1); - deserializer.WithSuccess(1, m1.Keys.ToArray()); - - var unset = new TopicPartitionOffset(tp3, Offset.Unset); - var offset5 = new TopicPartitionOffset(tp3, 5); - - offsets.WithGet(1, unset); - offsets.WithSetError(1, exception, offset5); - offsets.WithGet(2, unset); - offsets.WithSetError(2, exception, offset5); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - offsets.Verify(); - - // iteration 1 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(5); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertStop(exception); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertCommitExternal(); - observer.AssertStop(exception); - } -} \ No newline at end of file diff --git a/tests/Epam.Kafka.PubSub.Tests/Subscription/InternalState/HandlerErrorTests.cs b/tests/Epam.Kafka.PubSub.Tests/Subscription/InternalState/HandlerErrorTests.cs deleted file mode 100644 index 00fef18..0000000 --- a/tests/Epam.Kafka.PubSub.Tests/Subscription/InternalState/HandlerErrorTests.cs +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright © 2024 EPAM Systems - -using Confluent.Kafka; -using Epam.Kafka.PubSub.Subscription.Pipeline; -using Epam.Kafka.PubSub.Tests.Helpers; -using Epam.Kafka.Tests.Common; -using Microsoft.Extensions.DependencyInjection; -using Xunit; -using Xunit.Abstractions; - -namespace Epam.Kafka.PubSub.Tests.Subscription.InternalState; - -[Collection(SubscribeTests.Name)] -public class HandlerErrorTests : TestWithServices -{ - private readonly MockCluster _mockCluster; - - public HandlerErrorTests(ITestOutputHelper output, MockCluster mockCluster) : base(output) - { - this._mockCluster = mockCluster ?? throw new ArgumentNullException(nameof(mockCluster)); - } - - [Fact] - public async Task TransientErrorWithAdaptiveBatchSize() - { - Exception exception = new TestException(); - - TopicPartition tp3 = new(this.AnyTopicName, 3); - - using TestObserver observer = new(this, 8); - - var handler = new TestSubscriptionHandler(observer); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer).WithOptions(x => - { - x.BatchSize = 6; - }); - - Dictionary m1 = await MockCluster.SeedKafka(this, 6, tp3); - Dictionary m2 = await MockCluster.SeedKafka(this, 4, tp3); - - handler.WithError(2, exception, m1); - handler.WithError(3, exception, m1); - handler.WithError(4, exception, m1.Take(3)); - handler.WithError(5, exception, m1.Take(1)); - handler.WithSuccess(6, m1.Take(1)); - handler.WithSuccess(7, m1.Skip(1)); - handler.WithSuccess(8, m2); - - deserializer.WithSuccess(2, m1.Keys.ToArray()); - deserializer.WithSuccess(8, m2.Keys.ToArray()); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - - // iteration 1 - observer.AssertSubNotAssigned(); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(6); - observer.AssertProcess(); - observer.AssertStop(exception); - - // iteration 3 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertStop(exception); - - // iteration 4 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertStop(exception); - - // iteration 5 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertStop(exception); - - // iteration 6 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 7 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertProcess(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 8 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(4); - observer.AssertProcess(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - } -} \ No newline at end of file diff --git a/tests/Epam.Kafka.PubSub.Tests/Subscription/InternalState/ReadTests.cs b/tests/Epam.Kafka.PubSub.Tests/Subscription/InternalState/ReadTests.cs deleted file mode 100644 index a806221..0000000 --- a/tests/Epam.Kafka.PubSub.Tests/Subscription/InternalState/ReadTests.cs +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright © 2024 EPAM Systems - -using Confluent.Kafka; - -using Epam.Kafka.PubSub.Subscription.Pipeline; -using Epam.Kafka.PubSub.Tests.Helpers; -using Epam.Kafka.Tests.Common; - -using Microsoft.Extensions.DependencyInjection; - -using Xunit; -using Xunit.Abstractions; - -namespace Epam.Kafka.PubSub.Tests.Subscription.InternalState; - -public class ReadTests : TestWithServices, IClassFixture -{ - private readonly MockCluster _mockCluster; - - public ReadTests(ITestOutputHelper output, MockCluster mockCluster) : base(output) - { - this._mockCluster = mockCluster ?? throw new ArgumentNullException(nameof(mockCluster)); - } - - [Fact] - public async Task OneBatchTwoPartitions() - { - TopicPartition tp1 = new(this.AnyTopicName, 1); - TopicPartition tp2 = new(this.AnyTopicName, 2); - - using TestObserver observer = new(this, 3); - - var handler = new TestSubscriptionHandler(observer); - - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp1); - Dictionary m2 = await MockCluster.SeedKafka(this, 5, tp2); - - handler.WithSuccess(2, m1.Concat(m2)); - deserializer.WithSuccess(2, m1.Keys.ToArray()); - deserializer.WithSuccess(2, m2.Keys.ToArray()); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - - // iteration 1 - observer.AssertSubNotAssigned(); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(10); - observer.AssertProcess(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 3 - observer.AssertSubEmpty(); - } - - [Fact] - public async Task OnePartitionTwoBatches() - { - TopicPartition tp3 = new(this.AnyTopicName, 3); - - using TestObserver observer = new(this, 4); - - var handler = new TestSubscriptionHandler(observer); - var deserializer = new TestDeserializer(observer); - - this.Services.AddScoped(_ => handler); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer).WithOptions(x => x.BatchSize = 5); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp3); - Dictionary m2 = await MockCluster.SeedKafka(this, 5, tp3); - - handler.WithSuccess(2, m1); - handler.WithSuccess(3, m2); - deserializer.WithSuccess(2, m1.Keys.ToArray()); - deserializer.WithSuccess(3, m2.Keys.ToArray()); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - - // iteration 1 - observer.AssertSubNotAssigned(); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(5); - observer.AssertProcess(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 3 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(5); - observer.AssertProcess(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 4 - observer.AssertSubEmpty(); - } -} \ No newline at end of file diff --git a/tests/Epam.Kafka.PubSub.Tests/Subscription/InternalState/SerializationErrorTests.cs b/tests/Epam.Kafka.PubSub.Tests/Subscription/InternalState/SerializationErrorTests.cs deleted file mode 100644 index b7b4524..0000000 --- a/tests/Epam.Kafka.PubSub.Tests/Subscription/InternalState/SerializationErrorTests.cs +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright © 2024 EPAM Systems - -using Confluent.Kafka; -using Epam.Kafka.PubSub.Subscription.Pipeline; -using Epam.Kafka.PubSub.Tests.Helpers; -using Epam.Kafka.Tests.Common; - -using Microsoft.Extensions.DependencyInjection; - -using Xunit; -using Xunit.Abstractions; - -namespace Epam.Kafka.PubSub.Tests.Subscription.InternalState; - -[Collection(SubscribeTests.Name)] -public class SerializationErrorTests : TestWithServices -{ - private readonly MockCluster _mockCluster; - - public SerializationErrorTests(ITestOutputHelper output, MockCluster mockCluster) : base(output) - { - this._mockCluster = mockCluster ?? throw new ArgumentNullException(nameof(mockCluster)); - } - - [Fact] - public async Task SinglePartitionAtBeginning() - { - TopicPartition tp3 = new(this.AnyTopicName, 3); - - TestException exc = new(); - using TestObserver observer = new(this, 5); - - TestSubscriptionHandler handler = new(observer); - - TestDeserializer deserializer = new(observer); - - this.Services.AddScoped(_ => handler); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithOptions(x => - { - x.BatchNotAssignedTimeout = TimeSpan.FromSeconds(10); - x.BatchRetryCount = 1; - }); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp3); - - deserializer.WithError(2, exc, m1.Keys.First()); - deserializer.WithError(3, exc, m1.Keys.First()); - deserializer.WithError(5, exc, m1.Keys.First()); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - - // iteration 1 - observer.AssertSubNotAssigned(); - - // iteration 2 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(); - observer.AssertStop("Value deserialization error"); - - // iteration 3 batch retry - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(); - observer.AssertStop("Value deserialization error"); - - // iteration 4 partition not assigned until 6 sec session timeout elapsed - observer.AssertSubNotAssigned(); - - // iteration 5 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(); - observer.AssertStop("Value deserialization error"); - } - - [Fact] - public async Task SinglePartitionInTheMiddleOfBatch() - { - TopicPartition tp1 = new(this.AnyTopicName, 1); - - TestException exc = new(); - using TestObserver observer = new(this, 3); - - TestSubscriptionHandler handler = new(observer); - - TestDeserializer deserializer = new(observer); - - this.Services.AddScoped(_ => handler); - - observer.CreateDefaultSubscription(this._mockCluster).WithValueDeserializer(_ => deserializer) - .WithOptions(x => x.BatchNotAssignedTimeout = TimeSpan.FromSeconds(10)); - - Dictionary m1 = await MockCluster.SeedKafka(this, 5, tp1); - - deserializer.WithSuccess(2, m1.Keys.ElementAt(0)); - deserializer.WithError(2,exc, m1.Keys.ElementAt(1)); - deserializer.WithError(3, exc, m1.Keys.ElementAt(1)); - - handler.WithSuccess(2, m1.Take(1)); - - await this.RunBackgroundServices(); - - deserializer.Verify(); - handler.Verify(); - - // iteration 1 - observer.AssertSubNotAssigned(); - - // iteration 2 process deserialized items before error - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(1); - observer.AssertProcess(); - observer.AssertCommitKafka(); - observer.AssertStop(SubscriptionBatchResult.Processed); - - // iteration 3 - observer.AssertStart(); - observer.AssertAssign(); - observer.AssertRead(); - observer.AssertStop("Value deserialization error"); - } -} \ No newline at end of file diff --git a/tests/Epam.Kafka.PubSub.Tests/Subscription/SubServiceStartupTests.cs b/tests/Epam.Kafka.PubSub.Tests/Subscription/SubServiceStartupTests.cs deleted file mode 100644 index 1dd356b..0000000 --- a/tests/Epam.Kafka.PubSub.Tests/Subscription/SubServiceStartupTests.cs +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright © 2024 EPAM Systems - -using Confluent.Kafka; - -using Epam.Kafka.PubSub.Subscription.Options; -using Epam.Kafka.PubSub.Tests.Helpers; -using Epam.Kafka.Tests.Common; - -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; - -using Shouldly; - -using Xunit; -using Xunit.Abstractions; - -namespace Epam.Kafka.PubSub.Tests.Subscription; - -public class SubServiceStartupTests : TestWithServices -{ - public SubServiceStartupTests(ITestOutputHelper output) : base(output) - { - } - - [Theory] - [InlineData(nameof(SubscriptionOptions.BatchNotAssignedTimeout), "00:11:00", "BatchNotAssignedTimeout greater than '00:10:00'.")] - [InlineData(nameof(SubscriptionOptions.BatchPausedTimeout), "01:00:01", "BatchPausedTimeout greater than '00:10:00'.")] - [InlineData(nameof(SubscriptionOptions.BatchSize), "-1", "BatchSize less than '0'")] - public async Task FailedOptionsValidation(string key, string value, string expectedMessage) - { - using TestObserver observer = new(this, 1); - - this.ConfigurationBuilder.AddInMemoryCollection(new[] - { new KeyValuePair($"Kafka:Subscriptions:{observer.Name}:{key}", value) }); - - MockCluster.AddMockCluster(this) - .AddSubscription(observer.Name, ServiceLifetime.Scoped) - .WithOptions(options => { options.Topics = this.AnyTopicName; }); - - OptionsValidationException exc = - await Assert.ThrowsAsync(this.RunBackgroundServices); - - exc.Message.ShouldContain(expectedMessage); - } - - [Fact] - public async Task DefaultValueSerializerNotAvailable() - { - using TestObserver observer = new(this, 1); - - MockCluster.AddMockCluster(this) - .AddSubscription(observer.Name, ServiceLifetime.Scoped) - .WithOptions(options => { options.Topics = this.AnyTopicName; }); - - OptionsValidationException exc = - await Assert.ThrowsAsync(this.RunBackgroundServices); - - exc.Message.ShouldContain( - "Custom deserializer not set for non default value type"); - } - - [Fact] - public async Task ErrorInSerializerFactory() - { - TestException exception = new (); - - using TestObserver observer = new(this, 1); - - MockCluster.AddMockCluster(this) - .AddSubscription(observer.Name, ServiceLifetime.Scoped) - .WithOptions(options => { options.Topics = this.AnyTopicName; }) - .WithKeyDeserializer(_ => throw exception).WithValueDeserializer(_ => throw exception); - - TestException exc = - await Assert.ThrowsAsync(this.RunBackgroundServices); - } -} \ No newline at end of file diff --git a/tests/Epam.Kafka.PubSub.Tests/Subscription/SubscriptionHealthCheckTests.cs b/tests/Epam.Kafka.PubSub.Tests/Subscription/SubscriptionHealthCheckTests.cs deleted file mode 100644 index e7bc06d..0000000 --- a/tests/Epam.Kafka.PubSub.Tests/Subscription/SubscriptionHealthCheckTests.cs +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright © 2024 EPAM Systems - -using Epam.Kafka.PubSub.Common.Pipeline; -using Epam.Kafka.PubSub.Subscription.HealthChecks; -using Epam.Kafka.PubSub.Subscription.Options; -using Epam.Kafka.PubSub.Subscription.Pipeline; - -using Microsoft.Extensions.Diagnostics.HealthChecks; - -using Shouldly; - -using Xunit; - -namespace Epam.Kafka.PubSub.Tests.Subscription; - -public class SubscriptionHealthCheckTests -{ - private static readonly HealthCheckContext Context = new(); - - [Theory] - [InlineData(0, PipelineStatus.None, BatchStatus.None, SubscriptionBatchResult.None, 0, HealthStatus.Degraded)] - [InlineData(0, PipelineStatus.None, BatchStatus.None, SubscriptionBatchResult.None, 600, HealthStatus.Unhealthy)] - [InlineData(0, PipelineStatus.Failed, BatchStatus.None, SubscriptionBatchResult.None, 0, HealthStatus.Unhealthy)] - [InlineData(0, PipelineStatus.RetryTimeout, BatchStatus.None, SubscriptionBatchResult.None, 0, HealthStatus.Unhealthy)] - [InlineData(0, PipelineStatus.Cancelled, BatchStatus.None, SubscriptionBatchResult.None, 0, HealthStatus.Degraded)] - [InlineData(0, PipelineStatus.Disabled, BatchStatus.None, SubscriptionBatchResult.None, 0, HealthStatus.Healthy)] - [InlineData(1, PipelineStatus.Running, BatchStatus.None, SubscriptionBatchResult.None, 0, HealthStatus.Unhealthy)] - [InlineData(0, PipelineStatus.Running, BatchStatus.None, SubscriptionBatchResult.None, 0, HealthStatus.Healthy)] - [InlineData(0, PipelineStatus.Running, BatchStatus.Finished, SubscriptionBatchResult.None, 0, HealthStatus.Healthy)] - [InlineData(0, PipelineStatus.Running, BatchStatus.None, SubscriptionBatchResult.None, 600, HealthStatus.Unhealthy)] - [InlineData(0, PipelineStatus.Running, BatchStatus.Finished, SubscriptionBatchResult.None, 600, HealthStatus.Unhealthy)] - [InlineData(0, PipelineStatus.Running, BatchStatus.Running, SubscriptionBatchResult.None, 0, HealthStatus.Healthy)] - [InlineData(0, PipelineStatus.Running, BatchStatus.Running, SubscriptionBatchResult.None, 600, HealthStatus.Unhealthy)] - [InlineData(0, PipelineStatus.Running, BatchStatus.Reading, SubscriptionBatchResult.None, 0, HealthStatus.Healthy)] - [InlineData(0, PipelineStatus.Running, BatchStatus.Commiting, SubscriptionBatchResult.None, 0, HealthStatus.Healthy)] - [InlineData(0, PipelineStatus.Running, BatchStatus.Queued, SubscriptionBatchResult.None, 0, HealthStatus.Healthy)] - [InlineData(0, PipelineStatus.Running, BatchStatus.Commiting, SubscriptionBatchResult.None, 600, HealthStatus.Unhealthy)] - [InlineData(0, PipelineStatus.Running, BatchStatus.Queued, SubscriptionBatchResult.None, 600, HealthStatus.Unhealthy)] - [InlineData(0, PipelineStatus.Running, BatchStatus.Finished, SubscriptionBatchResult.Error, 0, HealthStatus.Degraded)] - [InlineData(0, PipelineStatus.Running, BatchStatus.Finished, SubscriptionBatchResult.NotAssigned, 0, HealthStatus.Healthy)] - public async Task CheckHealth(int pipelineRetry, PipelineStatus p, BatchStatus b, SubscriptionBatchResult r, - int seconds, HealthStatus expectedStatus) - { - SubscriptionMonitor monitor = new("any"); - - monitor.Pipeline.Update(p); - monitor.Batch.Update(b); - monitor.Result.Update(r); - monitor.PipelineRetryIteration = pipelineRetry; - - var check = new CheckForTest(monitor, seconds); - - HealthCheckResult result = await check.CheckHealthAsync(Context); - result.Status.ShouldBe(expectedStatus); - } - - private class CheckForTest : SubscriptionHealthCheck - { - private readonly int _seconds; - - public CheckForTest(SubscriptionMonitor monitor, int seconds) : base(new SubscriptionOptions(), monitor) - { - this._seconds = seconds; - } - - protected override DateTime UtcNow => DateTime.UtcNow.AddSeconds(this._seconds); - } -} \ No newline at end of file