From c09b06e82ee5e6debbc8a29cf898484d674c97cf Mon Sep 17 00:00:00 2001 From: sanych-sun Date: Sun, 14 Jan 2024 23:31:59 -0800 Subject: [PATCH] Explicit connection opening --- .../Program.cs | 7 +-- .../Program.cs | 4 +- .../Program.cs | 7 ++- .../Program.cs | 5 +-- .../Program.cs | 7 ++- .../Channels/IChannel.cs | 1 - .../ConnectionExtensions.cs | 5 --- src/RabbitMQ.Next.Abstractions/IConnection.cs | 2 + .../IConnectionBuilder.cs | 2 +- src/RabbitMQ.Next.Publisher/Publisher.cs | 5 --- src/RabbitMQ.Next/Connection.cs | 44 ++++++++++++++++--- src/RabbitMQ.Next/ConnectionBuilder.cs | 12 +++-- src/RabbitMQ.Next/ConnectionFactory.cs | 24 ---------- src/RabbitMQ.Next/IConnectionFactory.cs | 9 ---- .../Consumer/ConsumerBenchmarks.cs | 4 +- .../Publisher/PublishNoConfirmBenchmarks.cs | 14 +++--- .../Publisher/PublisherBenchmarks.cs | 44 ++++++++----------- 17 files changed, 86 insertions(+), 110 deletions(-) delete mode 100644 src/RabbitMQ.Next/ConnectionFactory.cs delete mode 100644 src/RabbitMQ.Next/IConnectionFactory.cs diff --git a/docs/examples/RabbitMQ.Next.Examples.DemoSaslAuthMechanism/Program.cs b/docs/examples/RabbitMQ.Next.Examples.DemoSaslAuthMechanism/Program.cs index e6d4456a..712bb32c 100644 --- a/docs/examples/RabbitMQ.Next.Examples.DemoSaslAuthMechanism/Program.cs +++ b/docs/examples/RabbitMQ.Next.Examples.DemoSaslAuthMechanism/Program.cs @@ -9,11 +9,12 @@ static async Task Main() { Console.WriteLine("Hello World! Will try to connect RabbitMQ server with RABBIT-CR-DEMO auth mechanism."); - var connection = await ConnectionBuilder.Default + await using var connection = ConnectionBuilder.Default .Endpoint("amqp://localhost:5672/") .WithRabbitCrDemoAuth("guest", "guest") - .ConnectAsync() - .ConfigureAwait(false); + .Build(); + + await connection.OpenAsync().ConfigureAwait(false); Console.WriteLine("Connection opened"); Console.WriteLine("Press any key to close the connection"); diff --git a/docs/examples/RabbitMQ.Next.Examples.DynamicSerializer/Program.cs b/docs/examples/RabbitMQ.Next.Examples.DynamicSerializer/Program.cs index d1d4b220..9852bc3f 100644 --- a/docs/examples/RabbitMQ.Next.Examples.DynamicSerializer/Program.cs +++ b/docs/examples/RabbitMQ.Next.Examples.DynamicSerializer/Program.cs @@ -13,14 +13,14 @@ class Program { static async Task Main() { - await using var connection = await ConnectionBuilder.Default + await using var connection = ConnectionBuilder.Default .Endpoint("amqp://test2:test2@localhost:5672/") .UseDynamicSerializer(serializer => serializer .When(m => "application/json".Equals(m.ContentType, StringComparison.InvariantCultureIgnoreCase)).UseSystemJsonSerializer() .When(m => "application/msgpack".Equals(m.ContentType, StringComparison.InvariantCultureIgnoreCase)).UseMessagePackSerializer() .When(_ => true).UseSystemJsonSerializer() ) - .ConnectAsync(); + .Build(); Console.WriteLine("Connection opened"); await PublishMessagesAsync(connection); diff --git a/docs/examples/RabbitMQ.Next.Examples.SimpleConsumer/Program.cs b/docs/examples/RabbitMQ.Next.Examples.SimpleConsumer/Program.cs index 6db751cd..8af3b682 100644 --- a/docs/examples/RabbitMQ.Next.Examples.SimpleConsumer/Program.cs +++ b/docs/examples/RabbitMQ.Next.Examples.SimpleConsumer/Program.cs @@ -12,17 +12,16 @@ static async Task Main() { Console.WriteLine("Hello World! This is consumer based on RabbitMQ.Next library."); - var connection = await ConnectionBuilder.Default + var connection = ConnectionBuilder.Default .Endpoint("amqp://guest:guest@localhost:5672/") .UsePlainTextSerializer() - .ConnectAsync() - .ConfigureAwait(false); + .Build(); Console.WriteLine("Connection opened"); await using var consumer = connection.Consumer( builder => builder - .BindToQueue("test-queue") + .BindToQueue("my-queue") .PrefetchCount(10)); Console.WriteLine("Consumer created. Press Ctrl+C to exit."); diff --git a/docs/examples/RabbitMQ.Next.Examples.SimplePublisher/Program.cs b/docs/examples/RabbitMQ.Next.Examples.SimplePublisher/Program.cs index cd7c63d7..ef85d274 100644 --- a/docs/examples/RabbitMQ.Next.Examples.SimplePublisher/Program.cs +++ b/docs/examples/RabbitMQ.Next.Examples.SimplePublisher/Program.cs @@ -13,11 +13,10 @@ static async Task Main() { Console.WriteLine("Hello World! This is publisher based on RabbitMQ.Next library."); - var connection = await ConnectionBuilder.Default + var connection = ConnectionBuilder.Default .Endpoint("amqp://guest:guest@localhost:5672/") .UsePlainTextSerializer() - .ConnectAsync() - .ConfigureAwait(false); + .Build(); Console.WriteLine("Connection opened"); diff --git a/docs/examples/RabbitMQ.Next.Examples.TopologyBuilder/Program.cs b/docs/examples/RabbitMQ.Next.Examples.TopologyBuilder/Program.cs index 032803bd..87987c3c 100644 --- a/docs/examples/RabbitMQ.Next.Examples.TopologyBuilder/Program.cs +++ b/docs/examples/RabbitMQ.Next.Examples.TopologyBuilder/Program.cs @@ -13,10 +13,9 @@ static async Task Main() { Console.WriteLine("Hello World! This is topology builder based on RabbitMQ.Next library."); - var connection = await ConnectionBuilder.Default - .Endpoint("amqp://test:pass@localhost:5672/") - .ConnectAsync() - .ConfigureAwait(false); + var connection = ConnectionBuilder.Default + .Endpoint("amqp://guest:guest@localhost:5672/") + .Build(); Console.WriteLine("Connection opened"); diff --git a/src/RabbitMQ.Next.Abstractions/Channels/IChannel.cs b/src/RabbitMQ.Next.Abstractions/Channels/IChannel.cs index bea15e2d..e49a5da7 100644 --- a/src/RabbitMQ.Next.Abstractions/Channels/IChannel.cs +++ b/src/RabbitMQ.Next.Abstractions/Channels/IChannel.cs @@ -1,5 +1,4 @@ using System; -using System.Buffers; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Next.Messaging; diff --git a/src/RabbitMQ.Next.Abstractions/ConnectionExtensions.cs b/src/RabbitMQ.Next.Abstractions/ConnectionExtensions.cs index 49026f1b..77e11333 100644 --- a/src/RabbitMQ.Next.Abstractions/ConnectionExtensions.cs +++ b/src/RabbitMQ.Next.Abstractions/ConnectionExtensions.cs @@ -15,11 +15,6 @@ public static Task UseChannelAsync(this IConnection connection, TState s public static async Task UseChannelAsync(this IConnection connection, TState state, Func> fn) { - if (connection.State != ConnectionState.Open) - { - throw new InvalidOperationException("Connection should be open."); - } - IChannel channel = null; try { diff --git a/src/RabbitMQ.Next.Abstractions/IConnection.cs b/src/RabbitMQ.Next.Abstractions/IConnection.cs index 612f2a7a..bebb7512 100644 --- a/src/RabbitMQ.Next.Abstractions/IConnection.cs +++ b/src/RabbitMQ.Next.Abstractions/IConnection.cs @@ -7,6 +7,8 @@ namespace RabbitMQ.Next; public interface IConnection : IAsyncDisposable { + Task OpenAsync(CancellationToken cancellationToken = default); + Task OpenChannelAsync(CancellationToken cancellationToken = default); ConnectionState State { get; } diff --git a/src/RabbitMQ.Next.Abstractions/IConnectionBuilder.cs b/src/RabbitMQ.Next.Abstractions/IConnectionBuilder.cs index aac9d498..d61f346f 100644 --- a/src/RabbitMQ.Next.Abstractions/IConnectionBuilder.cs +++ b/src/RabbitMQ.Next.Abstractions/IConnectionBuilder.cs @@ -18,5 +18,5 @@ public interface IConnectionBuilder: ISerializationBuilder IConnectionBuilder MaxFrameSize(int sizeBytes); - Task ConnectAsync(CancellationToken cancellation = default); + IConnection Build(); } \ No newline at end of file diff --git a/src/RabbitMQ.Next.Publisher/Publisher.cs b/src/RabbitMQ.Next.Publisher/Publisher.cs index 28d1f258..6ff68cb7 100644 --- a/src/RabbitMQ.Next.Publisher/Publisher.cs +++ b/src/RabbitMQ.Next.Publisher/Publisher.cs @@ -149,11 +149,6 @@ private ValueTask GetChannelAsync() private async ValueTask InitializeChannelAsync() { - if (this.connection.State != ConnectionState.Open) - { - throw new InvalidOperationException("Connection should be in Open state to use the Publisher API"); - } - await this.channelInitSync.WaitAsync().ConfigureAwait(false); IChannel ch = null; diff --git a/src/RabbitMQ.Next/Connection.cs b/src/RabbitMQ.Next/Connection.cs index 4fd06c5b..bc809d60 100644 --- a/src/RabbitMQ.Next/Connection.cs +++ b/src/RabbitMQ.Next/Connection.cs @@ -7,7 +7,6 @@ using RabbitMQ.Next.Exceptions; using RabbitMQ.Next.Buffers; using RabbitMQ.Next.Channels; -using RabbitMQ.Next.Serialization; using RabbitMQ.Next.Sockets; using RabbitMQ.Next.Tasks; using RabbitMQ.Next.Transport; @@ -18,6 +17,7 @@ namespace RabbitMQ.Next; internal class Connection : IConnection { + private readonly SemaphoreSlim connectionStateLock = new (1, 1); private readonly ChannelPool channelPool; private readonly ConnectionDetails connectionDetails; private readonly Channel socketSender; @@ -27,8 +27,13 @@ internal class Connection : IConnection private CancellationTokenSource socketIoCancellation; private IChannelInternal connectionChannel; - public Connection(ConnectionSettings settings, ObjectPool memoryPool) + public Connection(ConnectionSettings settings) { + // for best performance and code simplification buffer should fit entire frame + // (frame header + frame payload + frame-end) + var bufferSize = ProtocolConstants.FrameHeaderSize + settings.MaxFrameSize + ProtocolConstants.FrameEndSize; + this.memoryPool = new DefaultObjectPool(new MemoryPoolPolicy(bufferSize), 100); + this.connectionDetails = new ConnectionDetails(settings); this.socketSender = System.Threading.Channels.Channel.CreateBounded(new BoundedChannelOptions(100) { @@ -39,18 +44,22 @@ public Connection(ConnectionSettings settings, ObjectPool memoryPool) }); this.State = ConnectionState.Closed; - this.memoryPool = memoryPool; this.channelPool = new ChannelPool(this.CreateChannel); } public ConnectionState State { get; private set; } + + public async Task OpenAsync(CancellationToken cancellation = default) + { + await this.EnsureConnectionOpenAsync(cancellation).ConfigureAwait(false); + } - public async Task OpenChannelAsync(CancellationToken cancellationToken = default) + public async Task OpenChannelAsync(CancellationToken cancellation = default) { - // TODO: validate state + await this.EnsureConnectionOpenAsync(cancellation).ConfigureAwait(false); var channel = this.channelPool.Create(); - await channel.SendAsync(new Transport.Methods.Channel.OpenMethod(), cancellationToken); + await channel.SendAsync(new Transport.Methods.Channel.OpenMethod(), cancellation); return channel; } @@ -250,6 +259,29 @@ SharedMemory ReceiveNext(int expectedBytes, SharedMemory.MemoryAccessor previous } } + private async ValueTask EnsureConnectionOpenAsync(CancellationToken cancellation) + { + if (this.State == ConnectionState.Open) + { + return; + } + + await this.connectionStateLock.WaitAsync(cancellation).ConfigureAwait(false); + try + { + if (this.State == ConnectionState.Open) + { + return; + } + + await this.OpenConnectionAsync(cancellation).ConfigureAwait(false); + } + finally + { + this.connectionStateLock.Release(); + } + } + private void ConnectionClose(Exception ex) { if (this.socketIoCancellation == null || this.socketIoCancellation.IsCancellationRequested) diff --git a/src/RabbitMQ.Next/ConnectionBuilder.cs b/src/RabbitMQ.Next/ConnectionBuilder.cs index 35793470..97fc1f41 100644 --- a/src/RabbitMQ.Next/ConnectionBuilder.cs +++ b/src/RabbitMQ.Next/ConnectionBuilder.cs @@ -1,8 +1,6 @@ using System; using System.Collections.Generic; using System.Reflection; -using System.Threading; -using System.Threading.Tasks; using RabbitMQ.Next.Serialization; using RabbitMQ.Next.Transport; @@ -13,7 +11,7 @@ public class ConnectionBuilder : IConnectionBuilder private const string DefaultLocale = "en-US"; private const int DefaultMaxFrameSize = 131_072; // 128kB - private readonly IConnectionFactory factory; + private readonly Func factory; private readonly List endpoints = new(); private readonly Dictionary clientProperties = new(); private IAuthMechanism authMechanism; @@ -23,11 +21,11 @@ public class ConnectionBuilder : IConnectionBuilder private ISerializer serializer = null; // TODO: Implement noop serializer as default one private ConnectionBuilder() - : this(ConnectionFactory.Default) + : this(s => new Connection(s)) { } - internal ConnectionBuilder(IConnectionFactory factory) + internal ConnectionBuilder(Func factory) { this.factory = factory; } @@ -96,7 +94,7 @@ public IConnectionBuilder UseSerializer(ISerializer serializer) return this; } - public Task ConnectAsync(CancellationToken cancellation = default) + public IConnection Build() { var settings = new ConnectionSettings { @@ -109,6 +107,6 @@ public Task ConnectAsync(CancellationToken cancellation = default) Serializer = this.serializer, }; - return this.factory.ConnectAsync(settings, cancellation); + return this.factory(settings); } } \ No newline at end of file diff --git a/src/RabbitMQ.Next/ConnectionFactory.cs b/src/RabbitMQ.Next/ConnectionFactory.cs deleted file mode 100644 index 11a15fcd..00000000 --- a/src/RabbitMQ.Next/ConnectionFactory.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Extensions.ObjectPool; -using RabbitMQ.Next.Buffers; -using RabbitMQ.Next.Transport; - -namespace RabbitMQ.Next; - -internal sealed class ConnectionFactory : IConnectionFactory -{ - public static readonly IConnectionFactory Default = new ConnectionFactory(); - - public async Task ConnectAsync(ConnectionSettings settings, CancellationToken cancellation = default) - { - // for best performance and code simplification buffer should fit entire frame - // (frame header + frame payload + frame-end) - var bufferSize = ProtocolConstants.FrameHeaderSize + settings.MaxFrameSize + ProtocolConstants.FrameEndSize; - var memoryPool = new DefaultObjectPool(new MemoryPoolPolicy(bufferSize), 100); - - var connection = new Connection(settings, memoryPool); - await connection.OpenConnectionAsync(cancellation).ConfigureAwait(false); - return connection; - } -} \ No newline at end of file diff --git a/src/RabbitMQ.Next/IConnectionFactory.cs b/src/RabbitMQ.Next/IConnectionFactory.cs deleted file mode 100644 index 4d9a0029..00000000 --- a/src/RabbitMQ.Next/IConnectionFactory.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System.Threading; -using System.Threading.Tasks; - -namespace RabbitMQ.Next; - -internal interface IConnectionFactory -{ - Task ConnectAsync(ConnectionSettings settings, CancellationToken cancellation = default); -} \ No newline at end of file diff --git a/tests/RabbitMQ.Next.Benchmarks/Consumer/ConsumerBenchmarks.cs b/tests/RabbitMQ.Next.Benchmarks/Consumer/ConsumerBenchmarks.cs index ba12e163..4aa562dd 100644 --- a/tests/RabbitMQ.Next.Benchmarks/Consumer/ConsumerBenchmarks.cs +++ b/tests/RabbitMQ.Next.Benchmarks/Consumer/ConsumerBenchmarks.cs @@ -25,10 +25,10 @@ public class ConsumerBenchmarks [GlobalSetup] public async Task Setup() { - this.connection = await ConnectionBuilder.Default + this.connection = ConnectionBuilder.Default .Endpoint(Helper.RabbitMqConnection) .UsePlainTextSerializer() - .ConnectAsync(); + .Build(); ConnectionFactory factory = new ConnectionFactory() { diff --git a/tests/RabbitMQ.Next.Benchmarks/Publisher/PublishNoConfirmBenchmarks.cs b/tests/RabbitMQ.Next.Benchmarks/Publisher/PublishNoConfirmBenchmarks.cs index 36bb86e8..4f942b16 100644 --- a/tests/RabbitMQ.Next.Benchmarks/Publisher/PublishNoConfirmBenchmarks.cs +++ b/tests/RabbitMQ.Next.Benchmarks/Publisher/PublishNoConfirmBenchmarks.cs @@ -12,17 +12,15 @@ namespace RabbitMQ.Next.Benchmarks.Publisher; public class PublishNoConfirmBenchmarks { - private IConnection connection; - private RabbitMQ.Client.IConnection theirConnection; - - [GlobalSetup] - public async Task Setup() + private readonly IConnection connection; + private readonly RabbitMQ.Client.IConnection theirConnection; + + public PublishNoConfirmBenchmarks() { - this.connection = await ConnectionBuilder.Default + this.connection = ConnectionBuilder.Default .Endpoint(Helper.RabbitMqConnection) .UsePlainTextSerializer() - .ConnectAsync() - .ConfigureAwait(false); + .Build(); ConnectionFactory factory = new ConnectionFactory(); factory.Uri = Helper.RabbitMqConnection; diff --git a/tests/RabbitMQ.Next.Benchmarks/Publisher/PublisherBenchmarks.cs b/tests/RabbitMQ.Next.Benchmarks/Publisher/PublisherBenchmarks.cs index e3c8da3c..8b8ae163 100644 --- a/tests/RabbitMQ.Next.Benchmarks/Publisher/PublisherBenchmarks.cs +++ b/tests/RabbitMQ.Next.Benchmarks/Publisher/PublisherBenchmarks.cs @@ -12,9 +12,21 @@ namespace RabbitMQ.Next.Benchmarks.Publisher; public class PublisherBenchmarks { - private IConnection connection; - private RabbitMQ.Client.IConnection theirConnection; + private readonly IConnection connection; + private readonly RabbitMQ.Client.IConnection theirConnection; + public PublisherBenchmarks() + { + ConnectionFactory factory = new ConnectionFactory(); + factory.Uri = Helper.RabbitMqConnection; + this.theirConnection = factory.CreateConnection(); + + this.connection = ConnectionBuilder.Default + .Endpoint(Helper.RabbitMqConnection) + .UsePlainTextSerializer() + .Build(); + } + [Benchmark(Baseline = true)] [ArgumentsSource(nameof(TestCases))] public void PublishBaseLibrary(TestCaseParameters parameters) @@ -75,34 +87,14 @@ await publisher.PublishAsync(data, data.Payload, await publisher.DisposeAsync().ConfigureAwait(false); } - [GlobalSetup(Target = nameof(PublishBaseLibrary))] - public void SetupOfficialLibrary() - { - ConnectionFactory factory = new ConnectionFactory(); - factory.Uri = Helper.RabbitMqConnection; - - this.theirConnection = factory.CreateConnection(); - } - - [GlobalCleanup(Target = nameof(PublishBaseLibrary))] - public void CleanUpOfficialLibrary() + [GlobalCleanup()] + public async ValueTask CleanUpOfficialLibrary() { this.theirConnection.Close(); this.theirConnection.Dispose(); + await this.connection.DisposeAsync(); } - - [GlobalSetup(Targets = new[] {nameof(PublishParallelAsync), nameof(PublishAsync)})] - public async Task Setup() - { - this.connection = await ConnectionBuilder.Default - .Endpoint(Helper.RabbitMqConnection) - .UsePlainTextSerializer() - .ConnectAsync(); - } - - [GlobalCleanup(Targets = new[] {nameof(PublishParallelAsync), nameof(PublishAsync)})] - public ValueTask CleanUp() => this.connection.DisposeAsync(); - + public static IEnumerable TestCases() { TestCaseParameters GenerateTestCase(int payloadLen, int count, string name)