From b19d441b19fcfe8d6a7b323f93ce72beb569ab89 Mon Sep 17 00:00:00 2001 From: sanych-sun Date: Tue, 3 Sep 2024 20:56:00 -0700 Subject: [PATCH] Dispose CancellationTokenSource --- .../Program.cs | 2 +- .../Program.cs | 2 +- src/RabbitMQ.Next.Consumer/Consumer.cs | 4 +- .../ConfirmMessageHandler.cs | 2 +- src/RabbitMQ.Next/Connection.cs | 8 +- src/RabbitMQ.Next/Sockets/EndpointResolver.cs | 6 +- src/RabbitMQ.Next/Tasks/TaskExtensions.cs | 68 --------------- .../Tasks/TaskExtensionsTests.cs | 83 ------------------- 8 files changed, 15 insertions(+), 160 deletions(-) delete mode 100644 tests/RabbitMQ.Next.Tests/Tasks/TaskExtensionsTests.cs diff --git a/docs/examples/RabbitMQ.Next.Examples.DynamicSerializer/Program.cs b/docs/examples/RabbitMQ.Next.Examples.DynamicSerializer/Program.cs index fe993fe..91318e0 100644 --- a/docs/examples/RabbitMQ.Next.Examples.DynamicSerializer/Program.cs +++ b/docs/examples/RabbitMQ.Next.Examples.DynamicSerializer/Program.cs @@ -55,7 +55,7 @@ private static async Task ConsumeMessagesAsync(IConnection connection) .BindToQueue("my-queue") .PrefetchCount(10)); - var cancellation = new CancellationTokenSource(10_000); // simply cancel after 10 seconds + using var cancellation = new CancellationTokenSource(10_000); // simply cancel after 10 seconds await consumer.ConsumeAsync((message, content) => { Console.WriteLine($"Message content-type: {message.ContentType}, {content.Get().SomeProperty}"); diff --git a/docs/examples/RabbitMQ.Next.Examples.SimpleConsumer/Program.cs b/docs/examples/RabbitMQ.Next.Examples.SimpleConsumer/Program.cs index a8d102b..511fd0a 100644 --- a/docs/examples/RabbitMQ.Next.Examples.SimpleConsumer/Program.cs +++ b/docs/examples/RabbitMQ.Next.Examples.SimpleConsumer/Program.cs @@ -26,7 +26,7 @@ private static async Task Main() Console.WriteLine("Consumer created. Press Ctrl+C to exit."); - var cancellation = new CancellationTokenSource(); + using var cancellation = new CancellationTokenSource(); MonitorKeypressAsync(cancellation); diff --git a/src/RabbitMQ.Next.Consumer/Consumer.cs b/src/RabbitMQ.Next.Consumer/Consumer.cs index 0480cac..869f3d2 100644 --- a/src/RabbitMQ.Next.Consumer/Consumer.cs +++ b/src/RabbitMQ.Next.Consumer/Consumer.cs @@ -73,7 +73,7 @@ public async Task ConsumeAsync(Func h try { - await Task.WhenAny(cancellation.AsTask(), this.channel.Completion).ConfigureAwait(false); + await this.channel.Completion.WaitAsync(cancellation).ConfigureAwait(false); } finally { @@ -127,4 +127,4 @@ private async Task InitConsumerAsync(Func WaitForConfirmAsync(ulong deliveryTag, CancellationToken cance this.pendingConfirms.TryRemove(deliveryTag, out _); } - return tcs.Task.WithCancellation(cancellation); + return tcs.Task.WaitAsync(cancellation); } public void Handle(AckMethod method, IPayload payload) diff --git a/src/RabbitMQ.Next/Connection.cs b/src/RabbitMQ.Next/Connection.cs index 0cbea13..9faff0f 100644 --- a/src/RabbitMQ.Next/Connection.cs +++ b/src/RabbitMQ.Next/Connection.cs @@ -66,6 +66,8 @@ public async Task OpenChannelAsync(CancellationToken cancellation = de public async ValueTask DisposeAsync() { + this.socketIoCancellation?.Dispose(); + if (this.State != ConnectionState.Open) { return; @@ -312,8 +314,10 @@ private static void StartThread(Action threadStart, string threadName) private static async Task NegotiateConnectionAsync(IChannel channel, ConnectionSettings settings, CancellationToken cancellation) { - // connection should be forcibly closed if negotiation phase take more then 10s. - cancellation = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token.Combine(cancellation); + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellation); + // connection should be forcibly closed if negotiation phase take more than 10s. + cts.CancelAfter(TimeSpan.FromSeconds(10)); + cancellation = cts.Token; var startMethod = await channel.WaitAsync(cancellation).ConfigureAwait(false); if (!startMethod.Mechanisms.Contains(settings.Auth.Type)) diff --git a/src/RabbitMQ.Next/Sockets/EndpointResolver.cs b/src/RabbitMQ.Next/Sockets/EndpointResolver.cs index 2474e1e..366b391 100644 --- a/src/RabbitMQ.Next/Sockets/EndpointResolver.cs +++ b/src/RabbitMQ.Next/Sockets/EndpointResolver.cs @@ -8,7 +8,6 @@ using System.Threading; using System.Threading.Tasks; using RabbitMQ.Next.Exceptions; -using RabbitMQ.Next.Tasks; namespace RabbitMQ.Next.Sockets; @@ -16,7 +15,10 @@ internal static class EndpointResolver { public static async Task OpenSocketAsync(IReadOnlyList endpoints, CancellationToken cancellation) { - cancellation = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token.Combine(cancellation); + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellation); + cts.CancelAfter(TimeSpan.FromSeconds(10)); + + cancellation = cts.Token; Dictionary exceptions = null; foreach (var endpoint in endpoints) diff --git a/src/RabbitMQ.Next/Tasks/TaskExtensions.cs b/src/RabbitMQ.Next/Tasks/TaskExtensions.cs index 759191c..9aba8da 100644 --- a/src/RabbitMQ.Next/Tasks/TaskExtensions.cs +++ b/src/RabbitMQ.Next/Tasks/TaskExtensions.cs @@ -1,23 +1,10 @@ using System; -using System.Threading; using System.Threading.Tasks; namespace RabbitMQ.Next.Tasks; public static class TaskExtensions { - public static Task AsTask(this CancellationToken cancellation) - { - if (cancellation.IsCancellationRequested) - { - return Task.CompletedTask; - } - - var tcs = new TaskCompletionSource(); - cancellation.Register(s => ((TaskCompletionSource)s).TrySetResult(), tcs); - return tcs.Task; - } - public static (bool IsCompleted, T Result) Wait(this ValueTask valueTask, TimeSpan? timeout) { if (valueTask.IsCompleted) @@ -39,59 +26,4 @@ public static (bool IsCompleted, T Result) Wait(this ValueTask valueTask, return (false, default); } - - public static Task WithCancellation(this Task task, CancellationToken cancellation) - { - if (task.IsCompleted) - { - return task; - } - - if (cancellation.IsCancellationRequested) - { - throw new TaskCanceledException(); - } - - if (!cancellation.CanBeCanceled) - { - return task; - } - - return task.WrapTask(cancellation); - } - - public static CancellationToken Combine(this CancellationToken token, CancellationToken other) - { - if (token.IsCancellationRequested || other.IsCancellationRequested) - { - throw new TaskCanceledException(); - } - - if (!token.CanBeCanceled) - { - return other; - } - - if (!other.CanBeCanceled) - { - return token; - } - - return CancellationTokenSource.CreateLinkedTokenSource(token, other).Token; - } - - private static async Task WrapTask(this Task task, CancellationToken cancellation) - { - var cancellationSource = new TaskCompletionSource(); - await using var registration = cancellation.Register(tcs => ((TaskCompletionSource)tcs).TrySetResult(true), cancellationSource); - - await Task.WhenAny(task, cancellationSource.Task).ConfigureAwait(false); - - if (cancellation.IsCancellationRequested) - { - throw new TaskCanceledException(); - } - - return await task.ConfigureAwait(false); - } } diff --git a/tests/RabbitMQ.Next.Tests/Tasks/TaskExtensionsTests.cs b/tests/RabbitMQ.Next.Tests/Tasks/TaskExtensionsTests.cs deleted file mode 100644 index a3ac390..0000000 --- a/tests/RabbitMQ.Next.Tests/Tasks/TaskExtensionsTests.cs +++ /dev/null @@ -1,83 +0,0 @@ -using System.Threading; -using System.Threading.Tasks; -using RabbitMQ.Next.Tasks; -using Xunit; - -namespace RabbitMQ.Next.Tests.Tasks; - -public class TaskExtensionsTests -{ - [Fact] - public async Task WithCancellationAlreadyCompleted() - { - var task = Task.FromResult(2); - var cancellation = new CancellationTokenSource(); - - var wrapped = task.WithCancellation(cancellation.Token); - await Task.Yield(); - - Assert.True(wrapped.IsCompleted); - Assert.Equal(2, await wrapped); - Assert.Equal(task, wrapped); - } - - [Fact] - public async Task WithCancellationDefault() - { - var tcs = new TaskCompletionSource(); - var task = tcs.Task; - - var wrapped = task.WithCancellation(default); - await Task.Yield(); - tcs.SetResult(5); - - Assert.True(wrapped.IsCompleted); - Assert.Equal(5, await wrapped); - Assert.Equal(task, wrapped); - } - - [Fact] - public async Task WithCancellationAlreadyCancelled() - { - var tcs = new TaskCompletionSource(); - var task = tcs.Task; - tcs.SetCanceled(); - var cancellation = new CancellationTokenSource(); - - var wrapped = task.WithCancellation(cancellation.Token); - await Task.Yield(); - - Assert.True(wrapped.IsCompleted); - await Assert.ThrowsAsync(async() => await wrapped); - } - - [Fact] - public async Task WithCancellationReturnsResult() - { - var tcs = new TaskCompletionSource(); - var task = tcs.Task; - var cancellation = new CancellationTokenSource(); - - var wrapped = task.WithCancellation(cancellation.Token); - tcs.SetResult(5); - await Task.Delay(10); - - Assert.True(wrapped.IsCompleted); - Assert.Equal(5, await wrapped); - } - - [Fact] - public async Task WithCancellationThrowsOnCancel() - { - var tcs = new TaskCompletionSource(); - var task = tcs.Task; - var cancellation = new CancellationTokenSource(); - - var wrapped = task.WithCancellation(cancellation.Token); - cancellation.Cancel(); - await Task.Delay(100); - - Assert.True(wrapped.IsCanceled); - await Assert.ThrowsAsync(async() => await wrapped); - } -} \ No newline at end of file