diff --git a/tests/Epam.Kafka.Tests/KafkaFactoryTests.cs b/tests/Epam.Kafka.Tests/KafkaFactoryTests.cs index 93e8297..3dc58ea 100644 --- a/tests/Epam.Kafka.Tests/KafkaFactoryTests.cs +++ b/tests/Epam.Kafka.Tests/KafkaFactoryTests.cs @@ -14,6 +14,8 @@ using Xunit; using Xunit.Abstractions; +using static Confluent.Kafka.ConfigPropertyNames; + namespace Epam.Kafka.Tests; public class KafkaFactoryTests : TestWithServices @@ -151,7 +153,7 @@ public void CreateOauthConsumerCustom() configure: b => b.SetOAuthBearerTokenRefreshHandler( (_, _) => { invoked = true; })); - + Assert.NotNull(consumer); consumer.Consume(1000); @@ -337,6 +339,7 @@ public void ObservableConsumerErrors() var errorObs = new Mock>(); var statsObs = new Mock>(); + var parsedObs = new Mock>(); Assert.Throws(() => consumer.ShouldBeAssignableTo>()!.Subscribe(errorObs.Object)) @@ -344,6 +347,44 @@ public void ObservableConsumerErrors() Assert.Throws(() => consumer.ShouldBeAssignableTo>()!.Subscribe(statsObs.Object)) .Message.ShouldContain("Cannot subscribe to statistics because handler was explicitly set"); + + consumer.Dispose(); + + Assert.Throws(() => consumer.ShouldBeAssignableTo>()!.Subscribe(errorObs.Object)); + Assert.Throws(() => consumer.ShouldBeAssignableTo>()!.Subscribe(statsObs.Object)); + Assert.Throws(() => consumer.ShouldBeAssignableTo>()!.Subscribe(parsedObs.Object)); + + List tp = new List { new(string.Empty, 0) }; + List tpo = new List { new(tp[0], 0) }; + + Assert.Throws(() => consumer.Subscription); + Assert.Throws(() => consumer.ConsumerGroupMetadata); + Assert.Throws(() => consumer.Assignment); + Assert.Throws(() => consumer.MemberId); + Assert.Throws(() => consumer.Assign(tp[0])); + Assert.Throws(() => consumer.Assign(tpo[0])); + Assert.Throws(() => consumer.Assign(tp)); + Assert.Throws(() => consumer.Assign(tpo)); + Assert.Throws(() => consumer.Close()); + Assert.Throws(() => consumer.Commit()); + Assert.Throws(() => consumer.Commit(tpo)); + Assert.Throws(() => consumer.Commit(new ConsumeResult())); + Assert.Throws(() => consumer.Committed(TimeSpan.Zero)); + Assert.Throws(() => consumer.Committed(tp, TimeSpan.Zero)); + Assert.Throws(() => consumer.Consume()); + Assert.Throws(() => consumer.Consume(TimeSpan.Zero)); + Assert.Throws(() => consumer.Consume(0)); + Assert.Throws(() => consumer.GetWatermarkOffsets(tp[0])); + Assert.Throws(() => consumer.IncrementalAssign(tp)); + Assert.Throws(() => consumer.IncrementalAssign(tpo)); + Assert.Throws(() => consumer.IncrementalUnassign(tp)); + Assert.Throws(() => consumer.OffsetsForTimes(null, TimeSpan.Zero)); + Assert.Throws(() => consumer.Pause(tp)); + Assert.Throws(() => consumer.Position(tp[0])); + Assert.Throws(() => consumer.Resume(tp)); + Assert.Throws(() => consumer.Seek(tpo[0])); + Assert.Throws(() => consumer.StoreOffset(tpo[0])); + Assert.Throws(() => consumer.StoreOffset(new ConsumeResult())); } [Fact] @@ -366,6 +407,7 @@ public void ObservableProducerErrors() var errorObs = new Mock>(); var statsObs = new Mock>(); + var parsedObs = new Mock>(); Assert.Throws(() => producer.ShouldBeAssignableTo>()!.Subscribe(errorObs.Object)) @@ -374,6 +416,30 @@ public void ObservableProducerErrors() Assert.Throws(() => producer.ShouldBeAssignableTo>()!.Subscribe(statsObs.Object)) .Message.ShouldContain("Cannot subscribe to statistics because handler was explicitly set"); + + producer.Dispose(); + + Assert.Throws(() => producer.ShouldBeAssignableTo>()!.Subscribe(errorObs.Object)); + Assert.Throws(() => producer.ShouldBeAssignableTo>()!.Subscribe(statsObs.Object)); + Assert.Throws(() => producer.ShouldBeAssignableTo>()!.Subscribe(parsedObs.Object)); + + Assert.Throws(() => producer.Name); + Assert.Throws(() => producer.Handle); + Assert.Throws(() => producer.Poll(TimeSpan.Zero)); + Assert.Throws(() => producer.Produce(string.Empty, null)); + Assert.Throws(() => producer.Produce(new TopicPartition(string.Empty, 0), null)); + Assert.Throws(() => producer.AbortTransaction()); + Assert.Throws(() => producer.AbortTransaction(TimeSpan.Zero)); + Assert.Throws(() => producer.CommitTransaction()); + Assert.Throws(() => producer.CommitTransaction(TimeSpan.Zero)); + Assert.Throws(() => producer.BeginTransaction()); + Assert.Throws(() => producer.Flush()); + Assert.Throws(() => producer.Flush(TimeSpan.Zero)); + Assert.Throws(() => producer.SendOffsetsToTransaction(null!, null!, TimeSpan.Zero)); + Assert.Throws(() => producer.AddBrokers(null)); + Assert.Throws(() => producer.SetSaslCredentials(null, null)); + Assert.Throws(() => producer.OAuthBearerSetToken(null, 0, null)); + Assert.Throws(() => producer.OAuthBearerSetTokenFailure(null)); } [Theory]