Skip to content

Commit

Permalink
Dispose CancellationTokenSource
Browse files Browse the repository at this point in the history
  • Loading branch information
sanych-sun committed Sep 4, 2024
1 parent 1060b62 commit b19d441
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private static async Task ConsumeMessagesAsync(IConnection connection)
.BindToQueue("my-queue")
.PrefetchCount(10));

var cancellation = new CancellationTokenSource(10_000); // simply cancel after 10 seconds
using var cancellation = new CancellationTokenSource(10_000); // simply cancel after 10 seconds
await consumer.ConsumeAsync((message, content) =>
{
Console.WriteLine($"Message content-type: {message.ContentType}, {content.Get<DummyDto>().SomeProperty}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ private static async Task Main()

Console.WriteLine("Consumer created. Press Ctrl+C to exit.");

var cancellation = new CancellationTokenSource();
using var cancellation = new CancellationTokenSource();

MonitorKeypressAsync(cancellation);

Check warning on line 31 in docs/examples/RabbitMQ.Next.Examples.SimpleConsumer/Program.cs

View workflow job for this annotation

GitHub Actions / build

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Expand Down
4 changes: 2 additions & 2 deletions src/RabbitMQ.Next.Consumer/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public async Task ConsumeAsync(Func<IDeliveredMessage, IContentAccessor, Task> h

try
{
await Task.WhenAny(cancellation.AsTask(), this.channel.Completion).ConfigureAwait(false);
await this.channel.Completion.WaitAsync(cancellation).ConfigureAwait(false);
}
finally
{
Expand Down Expand Up @@ -127,4 +127,4 @@ private async Task InitConsumerAsync(Func<IDeliveredMessage, IContentAccessor, T
queue.ConsumerTag = response.ConsumerTag;
}
}
}
}
2 changes: 1 addition & 1 deletion src/RabbitMQ.Next.Publisher/ConfirmMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public Task<bool> WaitForConfirmAsync(ulong deliveryTag, CancellationToken cance
this.pendingConfirms.TryRemove(deliveryTag, out _);
}

return tcs.Task.WithCancellation(cancellation);
return tcs.Task.WaitAsync(cancellation);
}

public void Handle(AckMethod method, IPayload payload)
Expand Down
8 changes: 6 additions & 2 deletions src/RabbitMQ.Next/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public async Task<IChannel> OpenChannelAsync(CancellationToken cancellation = de

public async ValueTask DisposeAsync()
{
this.socketIoCancellation?.Dispose();

if (this.State != ConnectionState.Open)
{
return;
Expand Down Expand Up @@ -312,8 +314,10 @@ private static void StartThread(Action threadStart, string threadName)

private static async Task<NegotiationResults> NegotiateConnectionAsync(IChannel channel, ConnectionSettings settings, CancellationToken cancellation)
{
// connection should be forcibly closed if negotiation phase take more then 10s.
cancellation = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token.Combine(cancellation);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellation);
// connection should be forcibly closed if negotiation phase take more than 10s.
cts.CancelAfter(TimeSpan.FromSeconds(10));
cancellation = cts.Token;

var startMethod = await channel.WaitAsync<StartMethod>(cancellation).ConfigureAwait(false);
if (!startMethod.Mechanisms.Contains(settings.Auth.Type))
Expand Down
6 changes: 4 additions & 2 deletions src/RabbitMQ.Next/Sockets/EndpointResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Next.Exceptions;
using RabbitMQ.Next.Tasks;

namespace RabbitMQ.Next.Sockets;

internal static class EndpointResolver
{
public static async Task<ISocket> OpenSocketAsync(IReadOnlyList<Endpoint> endpoints, CancellationToken cancellation)
{
cancellation = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token.Combine(cancellation);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellation);
cts.CancelAfter(TimeSpan.FromSeconds(10));

cancellation = cts.Token;
Dictionary<Uri, Exception> exceptions = null;

foreach (var endpoint in endpoints)
Expand Down
68 changes: 0 additions & 68 deletions src/RabbitMQ.Next/Tasks/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,10 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQ.Next.Tasks;

public static class TaskExtensions
{
public static Task AsTask(this CancellationToken cancellation)
{
if (cancellation.IsCancellationRequested)
{
return Task.CompletedTask;
}

var tcs = new TaskCompletionSource();
cancellation.Register(s => ((TaskCompletionSource)s).TrySetResult(), tcs);
return tcs.Task;
}

public static (bool IsCompleted, T Result) Wait<T>(this ValueTask<T> valueTask, TimeSpan? timeout)
{
if (valueTask.IsCompleted)
Expand All @@ -39,59 +26,4 @@ public static (bool IsCompleted, T Result) Wait<T>(this ValueTask<T> valueTask,

return (false, default);
}

public static Task<TResult> WithCancellation<TResult>(this Task<TResult> task, CancellationToken cancellation)
{
if (task.IsCompleted)
{
return task;
}

if (cancellation.IsCancellationRequested)
{
throw new TaskCanceledException();
}

if (!cancellation.CanBeCanceled)
{
return task;
}

return task.WrapTask(cancellation);
}

public static CancellationToken Combine(this CancellationToken token, CancellationToken other)
{
if (token.IsCancellationRequested || other.IsCancellationRequested)
{
throw new TaskCanceledException();
}

if (!token.CanBeCanceled)
{
return other;
}

if (!other.CanBeCanceled)
{
return token;
}

return CancellationTokenSource.CreateLinkedTokenSource(token, other).Token;
}

private static async Task<TResult> WrapTask<TResult>(this Task<TResult> task, CancellationToken cancellation)
{
var cancellationSource = new TaskCompletionSource<bool>();
await using var registration = cancellation.Register(tcs => ((TaskCompletionSource<bool>)tcs).TrySetResult(true), cancellationSource);

await Task.WhenAny(task, cancellationSource.Task).ConfigureAwait(false);

if (cancellation.IsCancellationRequested)
{
throw new TaskCanceledException();
}

return await task.ConfigureAwait(false);
}
}
83 changes: 0 additions & 83 deletions tests/RabbitMQ.Next.Tests/Tasks/TaskExtensionsTests.cs

This file was deleted.

0 comments on commit b19d441

Please sign in to comment.