Skip to content

Commit

Permalink
ConfigureAwait
Browse files Browse the repository at this point in the history
  • Loading branch information
sanych-sun committed Sep 27, 2023
1 parent f3e50fa commit 6c08e34
Show file tree
Hide file tree
Showing 22 changed files with 65 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ static async Task Main()

var connection = await ConnectionBuilder.Default
.Endpoint("amqp://test2:test2@localhost:5672/")
.ConnectAsync();
.ConnectAsync()
.ConfigureAwait(false);

Console.WriteLine("Connection opened");

Expand All @@ -33,7 +34,7 @@ static async Task Main()
await consumer.ConsumeAsync(async message =>
{
Console.WriteLine($"[{DateTimeOffset.Now.TimeOfDay}] Message received via '{message.Exchange}' exchange: {message.Content<string>()}");
} ,cancellation.Token);
} ,cancellation.Token).ConfigureAwait(false);
}

private static Task MonitorKeypressAsync(CancellationTokenSource cancellation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ static async Task Main()

var connection = await ConnectionBuilder.Default
.Endpoint("amqp://test2:test2@localhost:5672/")
.ConnectAsync();
.ConnectAsync()
.ConfigureAwait(false);

Console.WriteLine("Connection opened");

Expand All @@ -33,7 +34,7 @@ static async Task Main()

try
{
await publisher.PublishAsync(input);
await publisher.PublishAsync(input).ConfigureAwait(false);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ static async Task Main()

var connection = await ConnectionBuilder.Default
.Endpoint("amqp://test:pass@localhost:5672/")
.ConnectAsync();
.ConnectAsync()
.ConfigureAwait(false);

Console.WriteLine("Connection opened");

Expand Down
8 changes: 4 additions & 4 deletions src/RabbitMQ.Next.Abstractions/ConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public static class ConnectionExtensions
public static Task UseChannelAsync<TState>(this IConnection connection, TState state, Func<TState, IChannel, Task> fn)
=> connection.UseChannelAsync((state, fn), async (st, ch) =>
{
await st.fn(st.state, ch);
await st.fn(st.state, ch).ConfigureAwait(false);
return true;
});

Expand All @@ -23,14 +23,14 @@ public static async Task<TResult> UseChannelAsync<TState, TResult>(this IConnect
IChannel channel = null;
try
{
channel = await connection.OpenChannelAsync();
return await fn(state, channel);
channel = await connection.OpenChannelAsync().ConfigureAwait(false);
return await fn(state, channel).ConfigureAwait(false);
}
finally
{
if (channel != null && !channel.Completion.IsCompleted)
{
await channel.CloseAsync();
await channel.CloseAsync().ConfigureAwait(false);
}
}
}
Expand Down
21 changes: 11 additions & 10 deletions src/RabbitMQ.Next.Consumer/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public Consumer(
}

public async ValueTask DisposeAsync()
=> await this.CancelConsumeAsync();
=> await this.CancelConsumeAsync().ConfigureAwait(false);

public async Task ConsumeAsync(Func<IDeliveredMessage, ValueTask> handler, CancellationToken cancellation)
{
Expand All @@ -58,15 +58,15 @@ public async Task ConsumeAsync(Func<IDeliveredMessage, ValueTask> handler, Cance
throw new ArgumentNullException(nameof(handler));
}

await this.InitConsumerAsync(handler);
await this.InitConsumerAsync(handler).ConfigureAwait(false);

try
{
await Task.WhenAny(cancellation.AsTask(), this.channel.Completion);
await Task.WhenAny(cancellation.AsTask(), this.channel.Completion).ConfigureAwait(false);
}
finally
{
await this.CancelConsumeAsync();
await this.CancelConsumeAsync().ConfigureAwait(false);
}
}

Expand All @@ -80,37 +80,38 @@ private async Task CancelConsumeAsync()
for (var i = 0; i < this.queues.Count; i++)
{
var queue = this.queues[i];
await this.channel.SendAsync<CancelMethod, CancelOkMethod>(new CancelMethod(queue.ConsumerTag));
await this.channel.SendAsync<CancelMethod, CancelOkMethod>(new CancelMethod(queue.ConsumerTag)).ConfigureAwait(false);
}

if (this.acknowledgement != null)
{
await this.acknowledgement.DisposeAsync();
await this.acknowledgement.DisposeAsync().ConfigureAwait(false);
}

await this.channel.CloseAsync();
await this.channel.CloseAsync().ConfigureAwait(false);

this.acknowledgement = null;
this.channel = null;
}

private async Task InitConsumerAsync(Func<IDeliveredMessage, ValueTask> handler)
{
this.channel = await this.connection.OpenChannelAsync();
this.channel = await this.connection.OpenChannelAsync().ConfigureAwait(false);
this.acknowledgement = this.acknowledgementFactory(this.channel);

var deliverHandler = new DeliverMessageHandler(handler, this.acknowledgement, this.serializer, this.onPoisonMessage, this.concurrencyLevel);
this.channel.WithMessageHandler(deliverHandler);

await this.channel.SendAsync<QosMethod, QosOkMethod>(new QosMethod(this.prefetchSize, this.prefetchCount, false));
await this.channel.SendAsync<QosMethod, QosOkMethod>(new QosMethod(this.prefetchSize, this.prefetchCount, false)).ConfigureAwait(false);

for (var i = 0; i < this.queues.Count; i++)
{
var queue = this.queues[i];
var response = await this.channel.SendAsync<ConsumeMethod, ConsumeOkMethod>(
new ConsumeMethod(
queue.Queue, queue.ConsumerTag, queue.NoLocal, this.acknowledgement == null,
queue.Exclusive, queue.Arguments));
queue.Exclusive, queue.Arguments))
.ConfigureAwait(false);

queue.ConsumerTag = response.ConsumerTag;
}
Expand Down
8 changes: 4 additions & 4 deletions src/RabbitMQ.Next.Consumer/DeliverMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,19 @@ public void Release(Exception ex = null)
private async Task ProcessDeliveredMessagesAsync()
{
var reader = this.deliverChannel.Reader;
while (await reader.WaitToReadAsync())
while (await reader.WaitToReadAsync().ConfigureAwait(false))
{
while (reader.TryRead(out var delivered))
{
var message = new DeliveredMessage(this.serializer, delivered.method, delivered.payload);
try
{
await this.messageHandler(message);
await this.acknowledgement.AckAsync(delivered.method.DeliveryTag);
await this.messageHandler(message).ConfigureAwait(false);
await this.acknowledgement.AckAsync(delivered.method.DeliveryTag).ConfigureAwait(false);
}
catch (Exception)
{
await this.acknowledgement.NackAsync(delivered.method.DeliveryTag, this.onPoisonMessage == PoisonMessageMode.Requeue);
await this.acknowledgement.NackAsync(delivered.method.DeliveryTag, this.onPoisonMessage == PoisonMessageMode.Requeue).ConfigureAwait(false);
}
finally
{
Expand Down
4 changes: 2 additions & 2 deletions src/RabbitMQ.Next.Publisher/ConfirmMessageMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ public ValueTask InitAsync(IChannel channel, CancellationToken cancellation)

public async ValueTask<ulong> InvokeAsync<TContent>(TContent content, IMessageBuilder message, CancellationToken cancellation)
{
var deliveryTag = await this.next.InvokeAsync(content, message, cancellation);
var deliveryTag = await this.next.InvokeAsync(content, message, cancellation).ConfigureAwait(false);

var confirmed = await this.confirms.WaitForConfirmAsync(deliveryTag, cancellation);
var confirmed = await this.confirms.WaitForConfirmAsync(deliveryTag, cancellation).ConfigureAwait(false);
if (!confirmed)
{
// todo: provide some useful info here
Expand Down
18 changes: 9 additions & 9 deletions src/RabbitMQ.Next.Publisher/Publisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public async ValueTask DisposeAsync()

if (ch != null)
{
await ch.CloseAsync();
await ch.CloseAsync().ConfigureAwait(false);
}
}

Expand All @@ -79,8 +79,8 @@ private async Task InternalPublishAsync<TContent>(TContent content, MessageBuild
{
this.CheckDisposed();

var pipeline = this.publishPipeline ?? await this.InitializeAsync(cancellation);
await pipeline.InvokeAsync(content, message, cancellation);
var pipeline = this.publishPipeline ?? await this.InitializeAsync(cancellation).ConfigureAwait(false);
await pipeline.InvokeAsync(content, message, cancellation).ConfigureAwait(false);
}
finally
{
Expand All @@ -104,7 +104,7 @@ private async ValueTask<IPublishMiddleware> InitializeAsync(CancellationToken ca
throw new InvalidOperationException("Connection should be in Open state to use the API");
}

await this.channelOpenSync.WaitAsync(cancellationToken);
await this.channelOpenSync.WaitAsync(cancellationToken).ConfigureAwait(false);

try
{
Expand All @@ -113,9 +113,9 @@ private async ValueTask<IPublishMiddleware> InitializeAsync(CancellationToken ca
return this.publishPipeline;
}

this.channel = await this.connection.OpenChannelAsync(cancellationToken);
this.channel = await this.connection.OpenChannelAsync(cancellationToken).ConfigureAwait(false);
// ensure target exchange exists
await this.channel.SendAsync<DeclareMethod, DeclareOkMethod>(new DeclareMethod(this.exchange), cancellationToken);
await this.channel.SendAsync<DeclareMethod, DeclareOkMethod>(new DeclareMethod(this.exchange), cancellationToken).ConfigureAwait(false);

IReturnMiddleware returnPipeline = new VoidReturnMiddleware();
for (var i = 0; i < this.returnMiddlewares.Count; i++)
Expand All @@ -126,12 +126,12 @@ private async ValueTask<IPublishMiddleware> InitializeAsync(CancellationToken ca
this.channel.WithMessageHandler(new ReturnMessageHandler(returnPipeline, this.serializer));

this.publishPipeline = new InternalMessagePublisher(this.exchange, this.serializer);
await this.publishPipeline.InitAsync(this.channel, default);
await this.publishPipeline.InitAsync(this.channel, default).ConfigureAwait(false);

for (var i = 0; i < this.publishMiddlewares.Count; i++)
{
this.publishPipeline = this.publishMiddlewares[i].Invoke(this.publishPipeline);
await this.publishPipeline.InitAsync(this.channel, default);
await this.publishPipeline.InitAsync(this.channel, default).ConfigureAwait(false);
}

return this.publishPipeline;
Expand All @@ -140,7 +140,7 @@ private async ValueTask<IPublishMiddleware> InitializeAsync(CancellationToken ca
{
if (this.channel != null)
{
await this.channel.CloseAsync();
await this.channel.CloseAsync().ConfigureAwait(false);
}

throw;
Expand Down
4 changes: 2 additions & 2 deletions src/RabbitMQ.Next.Publisher/ReturnMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ public void Release(Exception ex = null)
private async Task ProcessReturnedMessagesAsync()
{
var reader = this.returnChannel.Reader;
while (await reader.WaitToReadAsync())
while (await reader.WaitToReadAsync().ConfigureAwait(false))
{
while (reader.TryRead(out var returned))
{
try
{
await this.returnPipeline.InvokeAsync(returned, default);
await this.returnPipeline.InvokeAsync(returned, default).ConfigureAwait(false);
}
finally
{
Expand Down
4 changes: 2 additions & 2 deletions src/RabbitMQ.Next.TopologyBuilder/ConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static async Task ConfigureAsync(this IConnection connection, Func<ITopol
}

using var topologyBuilder = new TopologyBuilder(connection);
await builder.Invoke(topologyBuilder);
await builder.Invoke(topologyBuilder).ConfigureAwait(false);
}

public static async Task ConfigureAsync<TState>(this IConnection connection, TState state, Func<ITopologyBuilder, TState, Task> builder)
Expand All @@ -24,6 +24,6 @@ public static async Task ConfigureAsync<TState>(this IConnection connection, TSt
}

using var topologyBuilder = new TopologyBuilder(connection);
await builder.Invoke(topologyBuilder, state);
await builder.Invoke(topologyBuilder, state).ConfigureAwait(false);
}
}
2 changes: 0 additions & 2 deletions src/RabbitMQ.Next/Buffers/IMemoryAccessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,4 @@ internal interface IMemoryAccessor : IDisposable
IMemoryAccessor Append(IMemoryAccessor next);

void WriteTo(Stream stream);

void CopyTo(Span<byte> destination);
}
5 changes: 0 additions & 5 deletions src/RabbitMQ.Next/Buffers/MemoryAccessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,4 @@ public void WriteTo(Stream stream)
ArgumentNullException.ThrowIfNull(stream);
stream.Write(this.memory, 0, this.memory.Length);
}

public void CopyTo(Span<byte> destination)
{
this.memory.CopyTo(destination);
}
}
8 changes: 0 additions & 8 deletions src/RabbitMQ.Next/Buffers/PooledMemoryAccessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,6 @@ public void WriteTo(Stream stream)
stream.Write(this.memory, this.offset, this.Size);
}

public void CopyTo(Span<byte> destination)
{
if (this.Size > 0)
{
this.Memory.Span.CopyTo(destination);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void CheckDisposed()
{
Expand Down
6 changes: 0 additions & 6 deletions src/RabbitMQ.Next/Buffers/SharedMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,6 @@ public void WriteTo(Stream stream)
stream.Write(this.owner.memory, this.offset, this.Size);
}

public void CopyTo(Span<byte> destination)
{
this.CheckDisposed();
this.Memory.Span.CopyTo(destination);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void CheckDisposed()
{
Expand Down
12 changes: 6 additions & 6 deletions src/RabbitMQ.Next/Channels/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public async Task<TResponse> SendAsync<TRequest, TResponse>(TRequest request, Ca
where TRequest : struct, IOutgoingMethod where TResponse : struct, IIncomingMethod
{
var waitTask = this.WaitAsync<TResponse>(cancellation);
await this.SendAsync(request, cancellation);
return await waitTask;
await this.SendAsync(request, cancellation).ConfigureAwait(false);
return await waitTask.ConfigureAwait(false);
}

public async Task<ulong> PublishAsync<TState>(
Expand All @@ -116,7 +116,7 @@ public async Task<ulong> PublishAsync<TState>(
this.messageBuilderPool.Return(messageBuilder);
}

await this.WriteToSocketAsync(memory, cancellation);
await this.WriteToSocketAsync(memory, cancellation).ConfigureAwait(false);
var deliveryTag = Interlocked.Increment(ref this.lastDeliveryTag);

return deliveryTag;
Expand Down Expand Up @@ -157,7 +157,7 @@ public async Task<TMethod> WaitAsync<TMethod>(CancellationToken cancellation = d

try
{
return await waitHandler.WaitTask;
return await waitHandler.WaitTask.ConfigureAwait(false);
}
finally
{
Expand All @@ -167,13 +167,13 @@ public async Task<TMethod> WaitAsync<TMethod>(CancellationToken cancellation = d

public async Task CloseAsync(Exception ex = null)
{
await this.SendAsync<CloseMethod, CloseOkMethod>(new CloseMethod((ushort) ReplyCode.Success, string.Empty, MethodId.Unknown));
await this.SendAsync<CloseMethod, CloseOkMethod>(new CloseMethod((ushort) ReplyCode.Success, string.Empty, MethodId.Unknown)).ConfigureAwait(false);
this.TryComplete(ex);
}

public async Task CloseAsync(ushort statusCode, string description, MethodId failedMethodId)
{
await this.SendAsync<CloseMethod, CloseOkMethod>(new CloseMethod(statusCode, description, failedMethodId));
await this.SendAsync<CloseMethod, CloseOkMethod>(new CloseMethod(statusCode, description, failedMethodId)).ConfigureAwait(false);
this.TryComplete(new ChannelException(statusCode, description, failedMethodId));
}

Expand Down
2 changes: 1 addition & 1 deletion src/RabbitMQ.Next/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public async Task<IConnection> ConnectAsync(ConnectionSettings settings, Cancell
var memoryPool = new DefaultObjectPool<byte[]>(new MemoryPoolPolicy(bufferSize), 100);

var connection = new Connection(settings, memoryPool);
await connection.OpenConnectionAsync(cancellation);
await connection.OpenConnectionAsync(cancellation).ConfigureAwait(false);
return connection;
}
}
6 changes: 0 additions & 6 deletions src/RabbitMQ.Next/RabbitMQ.Next.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,5 @@
<ItemGroup>
<ProjectReference Include="..\RabbitMQ.Next.Abstractions\RabbitMQ.Next.Abstractions.csproj" />
</ItemGroup>

<ItemGroup>
<Compile Remove="Buffers\MemoryBlock.cs" />
<Compile Remove="Buffers\MemoryBlockPool.cs" />
<Compile Remove="Buffers\StaticMemoryAccessor.cs" />
</ItemGroup>

</Project>
Loading

0 comments on commit 6c08e34

Please sign in to comment.