Skip to content

Commit

Permalink
Explicit connection opening
Browse files Browse the repository at this point in the history
  • Loading branch information
sanych-sun committed Jan 15, 2024
1 parent 872c543 commit c09b06e
Show file tree
Hide file tree
Showing 17 changed files with 86 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ static async Task Main()
{
Console.WriteLine("Hello World! Will try to connect RabbitMQ server with RABBIT-CR-DEMO auth mechanism.");

var connection = await ConnectionBuilder.Default
await using var connection = ConnectionBuilder.Default
.Endpoint("amqp://localhost:5672/")
.WithRabbitCrDemoAuth("guest", "guest")
.ConnectAsync()
.ConfigureAwait(false);
.Build();

await connection.OpenAsync().ConfigureAwait(false);

Console.WriteLine("Connection opened");
Console.WriteLine("Press any key to close the connection");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ class Program
{
static async Task Main()
{
await using var connection = await ConnectionBuilder.Default
await using var connection = ConnectionBuilder.Default
.Endpoint("amqp://test2:test2@localhost:5672/")
.UseDynamicSerializer(serializer => serializer
.When(m => "application/json".Equals(m.ContentType, StringComparison.InvariantCultureIgnoreCase)).UseSystemJsonSerializer()
.When(m => "application/msgpack".Equals(m.ContentType, StringComparison.InvariantCultureIgnoreCase)).UseMessagePackSerializer()
.When(_ => true).UseSystemJsonSerializer()
)
.ConnectAsync();
.Build();

Console.WriteLine("Connection opened");
await PublishMessagesAsync(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@ static async Task Main()
{
Console.WriteLine("Hello World! This is consumer based on RabbitMQ.Next library.");

var connection = await ConnectionBuilder.Default
var connection = ConnectionBuilder.Default
.Endpoint("amqp://guest:guest@localhost:5672/")
.UsePlainTextSerializer()
.ConnectAsync()
.ConfigureAwait(false);
.Build();

Console.WriteLine("Connection opened");

await using var consumer = connection.Consumer(
builder => builder
.BindToQueue("test-queue")
.BindToQueue("my-queue")
.PrefetchCount(10));

Console.WriteLine("Consumer created. Press Ctrl+C to exit.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ static async Task Main()
{
Console.WriteLine("Hello World! This is publisher based on RabbitMQ.Next library.");

var connection = await ConnectionBuilder.Default
var connection = ConnectionBuilder.Default
.Endpoint("amqp://guest:guest@localhost:5672/")
.UsePlainTextSerializer()
.ConnectAsync()
.ConfigureAwait(false);
.Build();

Console.WriteLine("Connection opened");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ static async Task Main()
{
Console.WriteLine("Hello World! This is topology builder based on RabbitMQ.Next library.");

var connection = await ConnectionBuilder.Default
.Endpoint("amqp://test:pass@localhost:5672/")
.ConnectAsync()
.ConfigureAwait(false);
var connection = ConnectionBuilder.Default
.Endpoint("amqp://guest:guest@localhost:5672/")
.Build();

Console.WriteLine("Connection opened");

Expand Down
1 change: 0 additions & 1 deletion src/RabbitMQ.Next.Abstractions/Channels/IChannel.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Next.Messaging;
Expand Down
5 changes: 0 additions & 5 deletions src/RabbitMQ.Next.Abstractions/ConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ public static Task UseChannelAsync<TState>(this IConnection connection, TState s

public static async Task<TResult> UseChannelAsync<TState, TResult>(this IConnection connection, TState state, Func<TState, IChannel, Task<TResult>> fn)
{
if (connection.State != ConnectionState.Open)
{
throw new InvalidOperationException("Connection should be open.");
}

IChannel channel = null;
try
{
Expand Down
2 changes: 2 additions & 0 deletions src/RabbitMQ.Next.Abstractions/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace RabbitMQ.Next;

public interface IConnection : IAsyncDisposable
{
Task OpenAsync(CancellationToken cancellationToken = default);

Task<IChannel> OpenChannelAsync(CancellationToken cancellationToken = default);

ConnectionState State { get; }
Expand Down
2 changes: 1 addition & 1 deletion src/RabbitMQ.Next.Abstractions/IConnectionBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ public interface IConnectionBuilder: ISerializationBuilder<IConnectionBuilder>

IConnectionBuilder MaxFrameSize(int sizeBytes);

Task<IConnection> ConnectAsync(CancellationToken cancellation = default);
IConnection Build();
}
5 changes: 0 additions & 5 deletions src/RabbitMQ.Next.Publisher/Publisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,6 @@ private ValueTask<IChannel> GetChannelAsync()

private async ValueTask<IChannel> InitializeChannelAsync()
{
if (this.connection.State != ConnectionState.Open)
{
throw new InvalidOperationException("Connection should be in Open state to use the Publisher API");
}

await this.channelInitSync.WaitAsync().ConfigureAwait(false);

IChannel ch = null;
Expand Down
44 changes: 38 additions & 6 deletions src/RabbitMQ.Next/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using RabbitMQ.Next.Exceptions;
using RabbitMQ.Next.Buffers;
using RabbitMQ.Next.Channels;
using RabbitMQ.Next.Serialization;
using RabbitMQ.Next.Sockets;
using RabbitMQ.Next.Tasks;
using RabbitMQ.Next.Transport;
Expand All @@ -18,6 +17,7 @@ namespace RabbitMQ.Next;

internal class Connection : IConnection
{
private readonly SemaphoreSlim connectionStateLock = new (1, 1);
private readonly ChannelPool channelPool;
private readonly ConnectionDetails connectionDetails;
private readonly Channel<IMemoryAccessor> socketSender;
Expand All @@ -27,8 +27,13 @@ internal class Connection : IConnection
private CancellationTokenSource socketIoCancellation;
private IChannelInternal connectionChannel;

public Connection(ConnectionSettings settings, ObjectPool<byte[]> memoryPool)
public Connection(ConnectionSettings settings)
{
// for best performance and code simplification buffer should fit entire frame
// (frame header + frame payload + frame-end)
var bufferSize = ProtocolConstants.FrameHeaderSize + settings.MaxFrameSize + ProtocolConstants.FrameEndSize;
this.memoryPool = new DefaultObjectPool<byte[]>(new MemoryPoolPolicy(bufferSize), 100);

this.connectionDetails = new ConnectionDetails(settings);
this.socketSender = System.Threading.Channels.Channel.CreateBounded<IMemoryAccessor>(new BoundedChannelOptions(100)
{
Expand All @@ -39,18 +44,22 @@ public Connection(ConnectionSettings settings, ObjectPool<byte[]> memoryPool)
});

this.State = ConnectionState.Closed;
this.memoryPool = memoryPool;
this.channelPool = new ChannelPool(this.CreateChannel);
}

public ConnectionState State { get; private set; }

public async Task OpenAsync(CancellationToken cancellation = default)
{
await this.EnsureConnectionOpenAsync(cancellation).ConfigureAwait(false);
}

public async Task<IChannel> OpenChannelAsync(CancellationToken cancellationToken = default)
public async Task<IChannel> OpenChannelAsync(CancellationToken cancellation = default)
{
// TODO: validate state
await this.EnsureConnectionOpenAsync(cancellation).ConfigureAwait(false);

var channel = this.channelPool.Create();
await channel.SendAsync<Transport.Methods.Channel.OpenMethod, Transport.Methods.Channel.OpenOkMethod>(new Transport.Methods.Channel.OpenMethod(), cancellationToken);
await channel.SendAsync<Transport.Methods.Channel.OpenMethod, Transport.Methods.Channel.OpenOkMethod>(new Transport.Methods.Channel.OpenMethod(), cancellation);

return channel;
}
Expand Down Expand Up @@ -250,6 +259,29 @@ SharedMemory ReceiveNext(int expectedBytes, SharedMemory.MemoryAccessor previous
}
}

private async ValueTask EnsureConnectionOpenAsync(CancellationToken cancellation)
{
if (this.State == ConnectionState.Open)
{
return;
}

await this.connectionStateLock.WaitAsync(cancellation).ConfigureAwait(false);
try
{
if (this.State == ConnectionState.Open)
{
return;
}

await this.OpenConnectionAsync(cancellation).ConfigureAwait(false);
}
finally
{
this.connectionStateLock.Release();
}
}

private void ConnectionClose(Exception ex)
{
if (this.socketIoCancellation == null || this.socketIoCancellation.IsCancellationRequested)
Expand Down
12 changes: 5 additions & 7 deletions src/RabbitMQ.Next/ConnectionBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Next.Serialization;
using RabbitMQ.Next.Transport;

Expand All @@ -13,7 +11,7 @@ public class ConnectionBuilder : IConnectionBuilder
private const string DefaultLocale = "en-US";
private const int DefaultMaxFrameSize = 131_072; // 128kB

private readonly IConnectionFactory factory;
private readonly Func<ConnectionSettings, IConnection> factory;
private readonly List<Endpoint> endpoints = new();
private readonly Dictionary<string, object> clientProperties = new();
private IAuthMechanism authMechanism;
Expand All @@ -23,11 +21,11 @@ public class ConnectionBuilder : IConnectionBuilder
private ISerializer serializer = null; // TODO: Implement noop serializer as default one

private ConnectionBuilder()
: this(ConnectionFactory.Default)
: this(s => new Connection(s))
{
}

internal ConnectionBuilder(IConnectionFactory factory)
internal ConnectionBuilder(Func<ConnectionSettings, IConnection> factory)
{
this.factory = factory;
}
Expand Down Expand Up @@ -96,7 +94,7 @@ public IConnectionBuilder UseSerializer(ISerializer serializer)
return this;
}

public Task<IConnection> ConnectAsync(CancellationToken cancellation = default)
public IConnection Build()
{
var settings = new ConnectionSettings
{
Expand All @@ -109,6 +107,6 @@ public Task<IConnection> ConnectAsync(CancellationToken cancellation = default)
Serializer = this.serializer,
};

return this.factory.ConnectAsync(settings, cancellation);
return this.factory(settings);
}
}
24 changes: 0 additions & 24 deletions src/RabbitMQ.Next/ConnectionFactory.cs

This file was deleted.

9 changes: 0 additions & 9 deletions src/RabbitMQ.Next/IConnectionFactory.cs

This file was deleted.

4 changes: 2 additions & 2 deletions tests/RabbitMQ.Next.Benchmarks/Consumer/ConsumerBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ public class ConsumerBenchmarks
[GlobalSetup]
public async Task Setup()
{
this.connection = await ConnectionBuilder.Default
this.connection = ConnectionBuilder.Default
.Endpoint(Helper.RabbitMqConnection)
.UsePlainTextSerializer()
.ConnectAsync();
.Build();

ConnectionFactory factory = new ConnectionFactory()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@ namespace RabbitMQ.Next.Benchmarks.Publisher;

public class PublishNoConfirmBenchmarks
{
private IConnection connection;
private RabbitMQ.Client.IConnection theirConnection;

[GlobalSetup]
public async Task Setup()
private readonly IConnection connection;
private readonly RabbitMQ.Client.IConnection theirConnection;

public PublishNoConfirmBenchmarks()
{
this.connection = await ConnectionBuilder.Default
this.connection = ConnectionBuilder.Default
.Endpoint(Helper.RabbitMqConnection)
.UsePlainTextSerializer()
.ConnectAsync()
.ConfigureAwait(false);
.Build();

ConnectionFactory factory = new ConnectionFactory();
factory.Uri = Helper.RabbitMqConnection;
Expand Down
Loading

0 comments on commit c09b06e

Please sign in to comment.