Skip to content

Commit

Permalink
Improve statistics metrics #63
Browse files Browse the repository at this point in the history
  • Loading branch information
IharYakimush committed Jan 29, 2025
1 parent f867dd1 commit ea70c90
Showing 1 changed file with 67 additions and 1 deletion.
68 changes: 67 additions & 1 deletion tests/Epam.Kafka.Tests/KafkaFactoryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
using Xunit;
using Xunit.Abstractions;

using static Confluent.Kafka.ConfigPropertyNames;

namespace Epam.Kafka.Tests;

public class KafkaFactoryTests : TestWithServices
Expand Down Expand Up @@ -151,7 +153,7 @@ public void CreateOauthConsumerCustom()
configure: b =>
b.SetOAuthBearerTokenRefreshHandler(
(_, _) => { invoked = true; }));

Assert.NotNull(consumer);

consumer.Consume(1000);
Expand Down Expand Up @@ -337,13 +339,52 @@ public void ObservableConsumerErrors()

var errorObs = new Mock<IObserver<Error>>();
var statsObs = new Mock<IObserver<string>>();
var parsedObs = new Mock<IObserver<Statistics>>();

Assert.Throws<InvalidOperationException>(() =>
consumer.ShouldBeAssignableTo<IObservable<Error>>()!.Subscribe(errorObs.Object))
.Message.ShouldContain("Cannot subscribe to errors because handler was explicitly set");
Assert.Throws<InvalidOperationException>(() =>
consumer.ShouldBeAssignableTo<IObservable<string>>()!.Subscribe(statsObs.Object))
.Message.ShouldContain("Cannot subscribe to statistics because handler was explicitly set");

consumer.Dispose();

Assert.Throws<ObjectDisposedException>(() => consumer.ShouldBeAssignableTo<IObservable<Error>>()!.Subscribe(errorObs.Object));
Assert.Throws<ObjectDisposedException>(() => consumer.ShouldBeAssignableTo<IObservable<string>>()!.Subscribe(statsObs.Object));
Assert.Throws<ObjectDisposedException>(() => consumer.ShouldBeAssignableTo<IObservable<Statistics>>()!.Subscribe(parsedObs.Object));

List<TopicPartition> tp = new List<TopicPartition> { new(string.Empty, 0) };
List<TopicPartitionOffset> tpo = new List<TopicPartitionOffset> { new(tp[0], 0) };

Assert.Throws<ObjectDisposedException>(() => consumer.Subscription);
Assert.Throws<ObjectDisposedException>(() => consumer.ConsumerGroupMetadata);
Assert.Throws<ObjectDisposedException>(() => consumer.Assignment);
Assert.Throws<ObjectDisposedException>(() => consumer.MemberId);
Assert.Throws<ObjectDisposedException>(() => consumer.Assign(tp[0]));
Assert.Throws<ObjectDisposedException>(() => consumer.Assign(tpo[0]));
Assert.Throws<ObjectDisposedException>(() => consumer.Assign(tp));
Assert.Throws<ObjectDisposedException>(() => consumer.Assign(tpo));
Assert.Throws<ObjectDisposedException>(() => consumer.Close());
Assert.Throws<ObjectDisposedException>(() => consumer.Commit());
Assert.Throws<ObjectDisposedException>(() => consumer.Commit(tpo));
Assert.Throws<ObjectDisposedException>(() => consumer.Commit(new ConsumeResult<string, string>()));
Assert.Throws<ObjectDisposedException>(() => consumer.Committed(TimeSpan.Zero));
Assert.Throws<ObjectDisposedException>(() => consumer.Committed(tp, TimeSpan.Zero));
Assert.Throws<ObjectDisposedException>(() => consumer.Consume());
Assert.Throws<ObjectDisposedException>(() => consumer.Consume(TimeSpan.Zero));
Assert.Throws<ObjectDisposedException>(() => consumer.Consume(0));
Assert.Throws<ObjectDisposedException>(() => consumer.GetWatermarkOffsets(tp[0]));
Assert.Throws<ObjectDisposedException>(() => consumer.IncrementalAssign(tp));
Assert.Throws<ObjectDisposedException>(() => consumer.IncrementalAssign(tpo));
Assert.Throws<ObjectDisposedException>(() => consumer.IncrementalUnassign(tp));
Assert.Throws<ObjectDisposedException>(() => consumer.OffsetsForTimes(null, TimeSpan.Zero));
Assert.Throws<ObjectDisposedException>(() => consumer.Pause(tp));
Assert.Throws<ObjectDisposedException>(() => consumer.Position(tp[0]));
Assert.Throws<ObjectDisposedException>(() => consumer.Resume(tp));
Assert.Throws<ObjectDisposedException>(() => consumer.Seek(tpo[0]));
Assert.Throws<ObjectDisposedException>(() => consumer.StoreOffset(tpo[0]));
Assert.Throws<ObjectDisposedException>(() => consumer.StoreOffset(new ConsumeResult<string, string>()));
}

[Fact]
Expand All @@ -366,6 +407,7 @@ public void ObservableProducerErrors()

var errorObs = new Mock<IObserver<Error>>();
var statsObs = new Mock<IObserver<string>>();
var parsedObs = new Mock<IObserver<Statistics>>();

Assert.Throws<InvalidOperationException>(() =>
producer.ShouldBeAssignableTo<IObservable<Error>>()!.Subscribe(errorObs.Object))
Expand All @@ -374,6 +416,30 @@ public void ObservableProducerErrors()
Assert.Throws<InvalidOperationException>(() =>
producer.ShouldBeAssignableTo<IObservable<string>>()!.Subscribe(statsObs.Object))
.Message.ShouldContain("Cannot subscribe to statistics because handler was explicitly set");

producer.Dispose();

Assert.Throws<ObjectDisposedException>(() => producer.ShouldBeAssignableTo<IObservable<Error>>()!.Subscribe(errorObs.Object));
Assert.Throws<ObjectDisposedException>(() => producer.ShouldBeAssignableTo<IObservable<string>>()!.Subscribe(statsObs.Object));
Assert.Throws<ObjectDisposedException>(() => producer.ShouldBeAssignableTo<IObservable<Statistics>>()!.Subscribe(parsedObs.Object));

Assert.Throws<ObjectDisposedException>(() => producer.Name);
Assert.Throws<ObjectDisposedException>(() => producer.Handle);
Assert.Throws<ObjectDisposedException>(() => producer.Poll(TimeSpan.Zero));
Assert.Throws<ObjectDisposedException>(() => producer.Produce(string.Empty, null));
Assert.Throws<ObjectDisposedException>(() => producer.Produce(new TopicPartition(string.Empty, 0), null));
Assert.Throws<ObjectDisposedException>(() => producer.AbortTransaction());
Assert.Throws<ObjectDisposedException>(() => producer.AbortTransaction(TimeSpan.Zero));
Assert.Throws<ObjectDisposedException>(() => producer.CommitTransaction());
Assert.Throws<ObjectDisposedException>(() => producer.CommitTransaction(TimeSpan.Zero));
Assert.Throws<ObjectDisposedException>(() => producer.BeginTransaction());
Assert.Throws<ObjectDisposedException>(() => producer.Flush());
Assert.Throws<ObjectDisposedException>(() => producer.Flush(TimeSpan.Zero));
Assert.Throws<ObjectDisposedException>(() => producer.SendOffsetsToTransaction(null!, null!, TimeSpan.Zero));
Assert.Throws<ObjectDisposedException>(() => producer.AddBrokers(null));
Assert.Throws<ObjectDisposedException>(() => producer.SetSaslCredentials(null, null));
Assert.Throws<ObjectDisposedException>(() => producer.OAuthBearerSetToken(null, 0, null));
Assert.Throws<ObjectDisposedException>(() => producer.OAuthBearerSetTokenFailure(null));
}

[Theory]
Expand Down

0 comments on commit ea70c90

Please sign in to comment.