From 7dd971d4e3614e3ec85c52b15fb02e3c695d8d13 Mon Sep 17 00:00:00 2001 From: Ilya Usov Date: Thu, 4 Mar 2021 21:06:08 +0300 Subject: [PATCH] Support Async termination --- rd-net/Lifetimes/Lifetimes.csproj | 4 + .../Lifetimes/Lifetimes/LifetimeDefinition.cs | 286 +++++++-- .../Test.Lifetimes/Lifetimes/LifetimeTest.cs | 599 ++++++++++++++++++ 3 files changed, 842 insertions(+), 47 deletions(-) diff --git a/rd-net/Lifetimes/Lifetimes.csproj b/rd-net/Lifetimes/Lifetimes.csproj index f39dec2cb..bbd2fe26f 100644 --- a/rd-net/Lifetimes/Lifetimes.csproj +++ b/rd-net/Lifetimes/Lifetimes.csproj @@ -59,4 +59,8 @@ + + + + diff --git a/rd-net/Lifetimes/Lifetimes/LifetimeDefinition.cs b/rd-net/Lifetimes/Lifetimes/LifetimeDefinition.cs index 9f64a7d6f..602c8959d 100644 --- a/rd-net/Lifetimes/Lifetimes/LifetimeDefinition.cs +++ b/rd-net/Lifetimes/Lifetimes/LifetimeDefinition.cs @@ -18,6 +18,9 @@ namespace JetBrains.Lifetimes /// You can terminate this definition by method (or which is the same). /// public class LifetimeDefinition : IDisposable +#if !NET35 + ,IAsyncDisposable +#endif { #pragma warning disable 420 #region Statics @@ -27,9 +30,10 @@ public class LifetimeDefinition : IDisposable [PublicAPI] internal static readonly LifetimeDefinition Eternal = new LifetimeDefinition { Id = nameof(Eternal) }; [PublicAPI] internal static readonly LifetimeDefinition Terminated = new LifetimeDefinition { Id = nameof(Terminated) }; + private static CancellationToken CancelledToken => new CancellationToken(canceled: true); + static LifetimeDefinition() { - Terminated.ToCancellationToken(); //to create cts Terminated.Terminate(); } @@ -90,6 +94,9 @@ private int ThreadLocalExecuting(int increment = 0) private const int WaitForExecutingInTerminationTimeoutMsDefault = 500; [PublicAPI] public static int WaitForExecutingInTerminationTimeoutMs = WaitForExecutingInTerminationTimeoutMsDefault; +#if !NET35 + [PublicAPI] public static int WaitForExecutingInAsyncTerminationTimeoutMs = WaitForExecutingInTerminationTimeoutMsDefault; +#endif // use real (sealed) types to allow devirtualization private static readonly IntBitSlice ourExecutingSlice = BitSlice.Int(20); @@ -359,6 +366,9 @@ private bool IncrementStatusIfEqualsTo(LifetimeStatus status) #region Termination public void Dispose() => Terminate(); +#if !NET35 + public ValueTask DisposeAsync() => TerminateAsync(); +#endif [Obsolete("Use `Lifetime.IsAlive` or `Status` field instead")] public bool IsTerminated => Status >= LifetimeStatus.Terminating; @@ -392,26 +402,94 @@ public void Terminate() if (ourExecutingSlice[myState] > 0 /*optimization*/ && !SpinWait.SpinUntil(() => ourExecutingSlice[myState] <= ThreadLocalExecuting(), WaitForExecutingInTerminationTimeoutMs)) { - Log.Warn($"{this}: can't wait for `ExecuteIfAlive` completed on other thread in {WaitForExecutingInTerminationTimeoutMs} ms. Keep termination." + Environment.NewLine - + "This may happen either because of the ExecuteIfAlive failed to complete in a timely manner. In the case there will be following error messages." + Environment.NewLine - + "Or this might happen because of garbage collection or when the thread yielded execution in SpinWait.SpinOnce but did not receive execution back in a timely manner. If you are on JetBrains' Slack see the discussion https://jetbrains.slack.com/archives/CAZEUK2R0/p1606236742208100"); - - ourLogErrorAfterExecution.InterlockedUpdate(ref myState, true); + ErrorAfterExecutions(); } if (!IncrementStatusIfEqualsTo(LifetimeStatus.Canceling)) return; + + DisposeCtsOrExecutionsAwaiter(); Diagnostics(nameof(LifetimeStatus.Terminating)); //Now status is 'Terminating' and we have to wait for all resource modifications to complete. No mutex acquire is possible beyond this point. if (ourMutexSlice[myState]) //optimization SpinWaitEx.SpinUntil(() => !ourMutexSlice[myState]); - - Destruct(); + + Destruct(); Assertion.Assert(Status == LifetimeStatus.Terminated, "{0}: bad status for termination finish", this); Diagnostics(nameof(LifetimeStatus.Terminated)); } +#if !NET35 + /// + /// Asynchronously waits for all before terminating + /// All nested lifetimes and will be terminated asynchronously + /// + /// + /// if called under + [PublicAPI] + public ValueTask TerminateAsync() + { + if (IsEternal || Status > LifetimeStatus.Canceling) + return new ValueTask(); + + Diagnostics(nameof(LifetimeStatus.Canceling)); + + //parent could ask for canceled already + MarkCancelingRecursively(); + + var supportsTerminationUnderExecuting = AllowTerminationUnderExecution; + if (!supportsTerminationUnderExecuting && ThreadLocalExecuting() > 0) + throw new InvalidOperationException($"{this}: can't terminate under `ExecuteIfAlive` because termination doesn't support this. Use `{nameof(AllowTerminationUnderExecution)}`."); + + return TerminateAsync(supportsTerminationUnderExecuting); + } + + private async ValueTask TerminateAsync(bool supportsTerminationUnderExecuting) + { + if (!supportsTerminationUnderExecuting && ourExecutingSlice[myState] > 0) + { + var value = myCtsOrExecutionsAwaiter; + if (value is Disposed || value is ExecutionsAwaiter || Status >= LifetimeStatus.Terminating) + return; // termination already started + + var awaiter = new ExecutionsAwaiter(); + while (true) + { + if (value is Disposed || value is ExecutionsAwaiter || Status >= LifetimeStatus.Terminating) + return; // termination already started + + var originalValue = Interlocked.CompareExchange(ref myCtsOrExecutionsAwaiter, awaiter, value); + if (originalValue == value) break; + + value = originalValue; + } + + if (value is CancellationTokenSource source) + source.Cancel(); + + if (ourExecutingSlice[myState] > 0) + { + var succeeded = await awaiter.WaitAsync(WaitForExecutingInAsyncTerminationTimeoutMs); + if (!succeeded) ErrorAfterExecutions(); + } + } + + if (!IncrementStatusIfEqualsTo(LifetimeStatus.Canceling)) + return; + + DisposeCtsOrExecutionsAwaiter(); + + Diagnostics(nameof(LifetimeStatus.Terminating)); + //Now status is 'Terminating' and we have to wait for all resource modifications to complete. No mutex acquire is possible beyond this point. + if (ourMutexSlice[myState]) //optimization + SpinWaitEx.SpinUntil(() => !ourMutexSlice[myState]); + + await DestructAsync(); + Assertion.Assert(Status == LifetimeStatus.Terminated, "{0}: bad status for termination finish", this); + Diagnostics(nameof(LifetimeStatus.Terminated)); + } +#endif private void MarkCancelingRecursively() { @@ -419,8 +497,8 @@ private void MarkCancelingRecursively() if (!IncrementStatusIfEqualsTo(LifetimeStatus.Alive)) return; - - myCts?.Cancel(); + + (myCtsOrExecutionsAwaiter as CancellationTokenSource)?.Cancel(); // Some other thread can already begin destructuring // Then children lifetimes become canceled in their termination @@ -436,8 +514,76 @@ private void MarkCancelingRecursively() } } + private void ErrorAfterExecutions() + { + ourLogErrorAfterExecution.InterlockedUpdate(ref myState, true); + + Log.Warn($"{this}: can't wait for `ExecuteIfAlive` completed on other thread in {WaitForExecutingInTerminationTimeoutMs} ms. Keep termination." + Environment.NewLine + + "This may happen either because of the ExecuteIfAlive failed to complete in a timely manner. In the case there will be following error messages." + Environment.NewLine + + "Or this might happen because of garbage collection or when the thread yielded execution in SpinWait.SpinOnce but did not receive execution back in a timely manner. If you are on JetBrains' Slack see the discussion https://jetbrains.slack.com/archives/CAZEUK2R0/p1606236742208100"); + } + #if !NET35 [System.Runtime.ExceptionServices.HandleProcessCorruptedStateExceptions] + private async ValueTask DestructAsync() + { + var status = Status; + Assertion.Assert(status == LifetimeStatus.Terminating, "{0}: bad status for destructuring start", this); + Assertion.Assert(ourMutexSlice[myState] == false, "{0}: mutex must be released in this point", this); + //no one can take mutex after this point + + var resources = myResources; + Assertion.AssertNotNull(resources, "{0}: `resources` can't be null on destructuring stage", this); + + Assertion.Assert(myCtsOrExecutionsAwaiter == Disposed.Instance, "myCtsOrExecutionsAwaiter == DisposedAwaiter.Instance"); + + for (var i = myResCount - 1; i >= 0; i--) + { + try + { + switch (resources[i]) + { + case Action a: + a(); + break; + + case LifetimeDefinition ld: + await ld.TerminateAsync(); + break; + + case IAsyncDisposable ad: + await ad.DisposeAsync(); + break; + + case IDisposable d: + d.Dispose(); + break; + + case ITerminationHandler th: + th.OnTermination(Lifetime); + break; + + default: + Log.Error("{0}: unknown type of termination resource: {1}", this, resources[i]); + break; + } + } + catch (Exception e) + { + Log.Error(e, $"{this}: exception on termination of resource[{i}]: ${resources[i]}"); + } + } + + myResources = null; + myResCount = 0; + + var statusIncrementedSuccessfully = IncrementStatusIfEqualsTo(LifetimeStatus.Terminating); + Assertion.Assert(statusIncrementedSuccessfully, "{0}: bad status for destructuring finish", this); + } +#endif + + #if !NET35 + [System.Runtime.ExceptionServices.HandleProcessCorruptedStateExceptions] #endif private void Destruct() { @@ -449,6 +595,8 @@ private void Destruct() var resources = myResources; Assertion.AssertNotNull(resources, "{0}: `resources` can't be null on destructuring stage", this); + Assertion.Assert(myCtsOrExecutionsAwaiter == Disposed.Instance, "myCtsOrExecutionsAwaiter == DisposedAwaiter.Instance"); + for (var i = myResCount - 1; i >= 0; i--) { try @@ -485,15 +633,26 @@ private void Destruct() myResources = null; myResCount = 0; - //In fact we shouldn't make cts null, because it should provide stable CancellationToken to finish enclosing tasks in Canceled state (not Faulted) - //But to avoid memory leaks we must do it. So if you 1) run task with alive lifetime 2) terminate lifetime 3) in task invoke ThrowIfNotAlive() you can obtain `Faulted` state rather than `Canceled`. But it doesn't matter in `async-await` programming. - if (!ReferenceEquals(this, Terminated)) - myCts = null; - var statusIncrementedSuccessfully = IncrementStatusIfEqualsTo(LifetimeStatus.Terminating); Assertion.Assert(statusIncrementedSuccessfully, "{0}: bad status for destructuring finish", this); } + private void DisposeCtsOrExecutionsAwaiter() + { + Assertion.Assert(myCtsOrExecutionsAwaiter != Disposed.Instance, "myCtsOrExecutionsAwaiter != Disposed.Instance"); + + var originValue = Interlocked.Exchange(ref myCtsOrExecutionsAwaiter, Disposed.Instance); + if (originValue is CancellationTokenSource source) source.Cancel(); +#if !NET35 + else if (originValue is ExecutionsAwaiter awaiter) awaiter.TryFire(); +#endif + } + + private class Disposed + { + public static readonly Disposed Instance = new Disposed(); + } + @@ -662,10 +821,40 @@ private static Result WrapOrThrow([NotNull] Func action, bool wrap) return wrap ? Result.Wrap(action) : Result.Success(action()); } + private class ExecutionsAwaiter + { +#if !NET35 + private readonly TaskCompletionSource myTcs; + + public ExecutionsAwaiter() + { + myTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + + public Task WaitAsync(int timeoutMs) + { + var task = myTcs.Task; + if (task.IsCompleted) + return task; + + var timer = new System.Timers.Timer(timeoutMs) {AutoReset = false}; + timer.Elapsed += (_, __) => myTcs.TrySetResult(false); + timer.Start(); + + task.ContinueWith(t => timer.Dispose(), TaskScheduler.Default); + return task; + } + + public bool TryFire() => myTcs.TrySetResult(true) || myTcs.Task.Result; +#else + public ExecutionsAwaiter() => throw new NotSupportedException("Must not be created for NET35"); +#endif + } + /// /// Must be used only by /// - public struct ExecuteIfAliveCookie : IDisposable + public readonly struct ExecuteIfAliveCookie : IDisposable { [NotNull] private readonly LifetimeDefinition myDef; @@ -708,7 +897,7 @@ public void Dispose() if (!Succeed) return; - Interlocked.Decrement(ref myDef.myState); + var state = Interlocked.Decrement(ref myDef.myState); if (!myDisableIncrementThreadLocalExecuting) myDef.ThreadLocalExecuting(-1); @@ -716,10 +905,16 @@ public void Dispose() if (myAllowTerminationUnderExecuting) ourAllowTerminationUnderExecutionThreadStatic--; - if (ourLogErrorAfterExecution[myDef.myState]) + var shouldLogError = ourLogErrorAfterExecution[state]; +#if !NET35 + if (ourExecutingSlice[state] == 0 && Memory.VolatileRead(ref myDef.myCtsOrExecutionsAwaiter) is ExecutionsAwaiter awaiter) + shouldLogError = !awaiter.TryFire() || shouldLogError; +#endif + + if (shouldLogError) { Log.Error($"ExecuteIfAlive after termination of {myDef} took too much time (>{WaitForExecutingInTerminationTimeoutMs}ms)"); - } + } } } @@ -897,7 +1092,7 @@ internal T Bracket([NotNull] Func opening, [NotNull] Action closing) #region Cancellation - private CancellationTokenSource myCts; + private object myCtsOrExecutionsAwaiter; //Only if state >= Canceling @@ -907,18 +1102,26 @@ internal T Bracket([NotNull] Func opening, [NotNull] Action closing) private Result CanceledResult() => Result.Canceled(CanceledException()); - private void CreateCtsLazily() + private CancellationToken CreateCancellationToken() { - if (myCts != null) return; - + Assertion.Assert(!ReferenceEquals(this, Terminated), "Mustn't reach this point on lifetime `Terminated`"); + var cts = new CancellationTokenSource(); - Memory.Barrier(); - //to suppress reordering of init and ctor visible from outside - myCts = cts; - //But MarkCanceledRecursively may already happen, so we need to help Cancel source - if (Status != LifetimeStatus.Alive) - myCts.Cancel(); + var originalValue = Interlocked.CompareExchange(ref myCtsOrExecutionsAwaiter, cts, null); + if (originalValue is CancellationTokenSource source) + { + cts.Cancel(); + return source.Token; + } + + if (originalValue != null || Status != LifetimeStatus.Alive) + { + cts.Cancel(); + return CancelledToken; + } + + return cts.Token; } /// @@ -933,24 +1136,13 @@ public void ThrowIfNotAlive() internal CancellationToken ToCancellationToken(bool doNotCreateCts = false) { - if (myCts == null) - { - if (doNotCreateCts) - return Terminated.ToCancellationToken(); - - using (var mutex = new UnderMutexCookie(this, LifetimeStatus.Alive)) - { - if (!mutex.Success) - { - Assertion.Assert(!ReferenceEquals(this, Terminated), "Mustn't reach this point on lifetime `Terminated`"); - return Terminated.ToCancellationToken(); //to get stable CancellationTokenSource (for tasks to finish in Canceling state, rather than Faulted) - } - - CreateCtsLazily(); - } - } - - return myCts.Token; + if (myCtsOrExecutionsAwaiter is CancellationTokenSource source) + return source.Token; + + if (doNotCreateCts || Status != LifetimeStatus.Alive) + return CancelledToken; + + return CreateCancellationToken(); } diff --git a/rd-net/Test.Lifetimes/Lifetimes/LifetimeTest.cs b/rd-net/Test.Lifetimes/Lifetimes/LifetimeTest.cs index 08be2f43b..fc20aac9f 100644 --- a/rd-net/Test.Lifetimes/Lifetimes/LifetimeTest.cs +++ b/rd-net/Test.Lifetimes/Lifetimes/LifetimeTest.cs @@ -3,11 +3,13 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using JetBrains.Collections.Viewable; using JetBrains.Core; using JetBrains.Diagnostics; using JetBrains.Diagnostics.Internal; using JetBrains.Lifetimes; using JetBrains.Threading; +using JetBrains.Util.Internal; using NUnit.Framework; namespace Test.Lifetimes.Lifetimes @@ -257,6 +259,62 @@ void LoggerHandler(LeveledMessage message) Assert.IsTrue(warningReceived, "Warning `{0}` must have been logged", expectedWarningText); Assert.IsTrue(exceptionReceived, "Exception `{0}` must have been logged", expectedExceptionText); } + +#if !NET35 + [Test] + public void TestLongTryExecuteAsync() + { + const string expectedWarningText = "can't wait for `ExecuteIfAlive` completed on other thread"; + const string expectedExceptionText = "ExecuteIfAlive after termination of"; + bool warningReceived = false, exceptionReceived = false; + + Lifetime.Using(lifetime => + { + void LoggerHandler(LeveledMessage message) + { + if (message.Level == LoggingLevel.WARN && message.FormattedMessage.Contains(expectedWarningText)) + warningReceived = true; + } + + lifetime.Bracket( + () => TestLogger.ExceptionLogger.Handlers += LoggerHandler, + () => TestLogger.ExceptionLogger.Handlers -= LoggerHandler + ); + + var lifetimeDefinition = lifetime.CreateNested(); + var lifetimeTerminatedEvent = new ManualResetEvent(false); + var backgroundThreadIsInTryExecuteEvent = new ManualResetEvent(false); + var thread = new Thread(() => lifetimeDefinition.Lifetime.TryExecute(() => + { + backgroundThreadIsInTryExecuteEvent.Set(); + lifetimeTerminatedEvent.WaitOne(); + })); + thread.Start(); + backgroundThreadIsInTryExecuteEvent.WaitOne(); + + var terminationTask = lifetimeDefinition.TerminateAsync(); + SpinWait.SpinUntil(() => terminationTask.IsCompleted); + + lifetimeTerminatedEvent.Set(); + thread.Join(); + try + { + TestLogger.ExceptionLogger.ThrowLoggedExceptions(); + } + catch (Exception e) + { + if (!e.Message.Contains(expectedExceptionText)) + throw; + + exceptionReceived = true; + } + }); + + Assert.IsTrue(warningReceived, "Warning `{0}` must have been logged", expectedWarningText); + Assert.IsTrue(exceptionReceived, "Exception `{0}` must have been logged", expectedExceptionText); + } +#endif + [Test] public void TestBracketGood() @@ -1171,6 +1229,547 @@ public void TestTerminatesAfter() Thread.Sleep(200); Assert.True(lf.IsNotAlive); } + + [Test] + public void DoubleAsyncTerminationTest() + { + { + var definition = new LifetimeDefinition(); + var count = 0; + definition.Lifetime.OnTermination(() => count++); + definition.Terminate(); + Assert.AreEqual(1, count); + Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully); + Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully); + definition.Terminate(); + Assert.AreEqual(1, count); + } + + { + var definition = new LifetimeDefinition(); + var count = 0; + definition.Lifetime.OnTermination(() => count++); + Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully); + Assert.AreEqual(1, count); + definition.Terminate(); + definition.Terminate(); + Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully); + Assert.AreEqual(1, count); + } + + { + var definition = new LifetimeDefinition(); + var count = 0; + definition.Lifetime.OnTermination(() => count++); + var e1 = new ManualResetEvent(false); + var e2 = new ManualResetEvent(false); + var task = TestLifetime.Start(TaskScheduler.Default, () => + { + using (definition.Lifetime.UsingExecuteIfAlive()) + { + e1.Set(); + e2.WaitOne(); + } + }); + + e1.WaitOne(); + var t = definition.TerminateAsync(); + Assert.IsFalse(t.IsCompleted); + + Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully); + e2.Set(); + + Assert.IsTrue(task.Wait(TimeSpan.FromSeconds(10))); + Assert.IsTrue(t.AsTask().Wait(TimeSpan.FromSeconds(10))); + + Assert.AreEqual(1, count); + } + } + + [Test] + public void ConcurrentDoubleAsyncTermination() + { + for (var i = 0; i < 10000; i++) + { + var definition = new LifetimeDefinition(); + var count = 0; + definition.Lifetime.OnTermination(() => count++); + + const int threadsCount = 10; + var threads = 0; + var tasks = Enumerable.Range(0, threadsCount).Select(_ => TestLifetime.Start(TaskScheduler.Default, () => + { + Interlocked.Increment(ref threads); + SpinWait.SpinUntil(() => Memory.VolatileRead(ref threads) == threadsCount); // sync threads + + Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully); + })).ToArray(); + + Assert.IsTrue(Task.WaitAll(tasks, TimeSpan.FromMinutes(1))); + Assert.AreEqual(1, count); + } + } + + [Test] + public void SimpleTerminateAsyncTest() + { + { + var definition = new LifetimeDefinition(); + var currentThread = Thread.CurrentThread; + var called = false; + definition.Lifetime.OnTermination(() => + { + Assert.AreEqual(currentThread, Thread.CurrentThread); + called = true; + }); + + var task = definition.TerminateAsync(); + Assert.IsTrue(task.IsCompletedSuccessfully); + Assert.IsTrue(called); + } + + { + var scheduler = SingleThreadScheduler.RunOnSeparateThread(TestLifetime, "TestScheduler"); + + var definition = new LifetimeDefinition(); + var called = false; + definition.Lifetime.OnTermination(() => + { + scheduler.AssertThread(); + called = true; + }); + + Task task; + var e1 = new SemaphoreSlim(0); + var e2 = new SemaphoreSlim(0); + using (var cookie = definition.Lifetime.UsingExecuteIfAlive()) + { + Assert.IsTrue(cookie.Succeed); + task = TestLifetime.StartAsync(scheduler, async () => + { + scheduler.AssertThread(); + var task = definition.TerminateAsync(); + Assert.IsFalse(task.IsCompletedSuccessfully); + Assert.IsFalse(called); + + scheduler.Queue(() => e1.Release()); + await e2.WaitAsync(); + + scheduler.AssertThread(); + Assert.IsTrue(task.IsCompletedSuccessfully); + Assert.IsTrue(called); + }); + + e1.Wait(); + } + + e2.Release(); + Assert.IsTrue(task.Wait(TimeSpan.FromSeconds(10))); + + Assert.AreEqual(TaskStatus.RanToCompletion, task.Status); + Assert.IsTrue(called); + } + } + + [Test] + public void SimpleNestedAsyncTermination() + { + var def1 = new LifetimeDefinition(); + var def2 = def1.Lifetime.CreateNested(); + + var e1 = new ManualResetEvent(false); + var e2 = new ManualResetEvent(false); + + TestLifetime.Start(TaskScheduler.Default, () => + { + using (var cookie = def2.UsingExecuteIfAlive()) + { + Assert.IsTrue(cookie.Succeed); + + e1.Set(); + e2.WaitOne(); + } + }); + + e1.WaitOne(); + + var task = def1.TerminateAsync(); + Assert.IsFalse(task.IsCompleted); + Assert.AreEqual(LifetimeStatus.Terminating, def1.Status); + Assert.AreEqual(LifetimeStatus.Canceling, def2.Status); + + e2.Set(); + + Assert.IsTrue(task.AsTask().Wait(TimeSpan.FromSeconds(10))); + + Assert.IsTrue(task.IsCompletedSuccessfully); + Assert.AreEqual(LifetimeStatus.Terminated, def1.Status); + Assert.AreEqual(LifetimeStatus.Terminated, def2.Status); + } + + [Test] + public void AsyncDisposableTerminationTest() + { + var disposable = new TestAsyncDisposable(); + var definition = new LifetimeDefinition(); + definition.Lifetime.OnTermination(disposable); + + var scheduler = new SequentialScheduler("TestScheduler", TestLifetime); + var task = TestLifetime.StartAsync(scheduler, async () => + { + var t = definition.TerminateAsync(); + Assert.IsFalse(t.IsCompleted); + Assert.AreEqual(LifetimeStatus.Terminating, definition.Status); + + Assert.IsTrue(disposable.Disposing); + Assert.IsFalse(disposable.Disposed); + + await Task.Yield(); + + Assert.IsTrue(disposable.Disposed); + Assert.IsTrue(t.IsCompletedSuccessfully); + Assert.AreEqual(LifetimeStatus.Terminated, definition.Status); + }); + + Assert.IsTrue(task.Wait(TimeSpan.FromSeconds(10))); + Assert.AreEqual(TaskStatus.RanToCompletion, task.Status); + Assert.AreEqual(LifetimeStatus.Terminated, definition.Status); + } + + [Test] + public void AllowAsyncTerminationTest() + { + var definition = new LifetimeDefinition {AllowTerminationUnderExecution = true}; + var called = false; + definition.Lifetime.OnTermination(() => called = true); + using (definition.Lifetime.UsingExecuteIfAlive()) + { + definition.TerminateAsync(); + Assert.IsTrue(called); + } + } + + [Test] + public void AsyncTerminationUnderExecutionErrorTest() + { + var definition = new LifetimeDefinition(); + var called = false; + var thread = Thread.CurrentThread; + definition.Lifetime.OnTermination(() => + { + Assert.AreEqual(thread, Thread.CurrentThread); + called = true; + }); + using (definition.Lifetime.UsingExecuteIfAlive()) + { + Assert.Throws(() => definition.TerminateAsync()); + Assert.IsFalse(called); + } + Assert.IsFalse(called); + + Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully); + Assert.IsTrue(called); + } + + [Test] + public void CancellationTokenAsyncTerminationTest() + { + { + var definition = new LifetimeDefinition(); + var token = definition.Lifetime.ToCancellationToken(); + definition.TerminateAsync(); + Assert.IsTrue(token.IsCancellationRequested); + } + + { + var definition = new LifetimeDefinition(); + definition.TerminateAsync(); + var token = definition.Lifetime.ToCancellationToken(); + Assert.IsTrue(token.IsCancellationRequested); + } + + { + var definition = new LifetimeDefinition(); + var token = definition.Lifetime.ToCancellationToken(); + var e1 = new ManualResetEvent(false); + var e2 = new ManualResetEvent(false); + TestLifetime.Start(TaskScheduler.Default, () => + { + using (definition.Lifetime.UsingExecuteIfAlive()) + { + e1.Set(); + e2.WaitOne(); + } + }); + + var task = definition.TerminateAsync(); + Assert.IsTrue(token.IsCancellationRequested); + e2.Set(); + Assert.IsTrue(task.AsTask().Wait(TimeSpan.FromSeconds(10))); + } + + { + var definition = new LifetimeDefinition(); + var e1 = new ManualResetEvent(false); + var e2 = new ManualResetEvent(false); + TestLifetime.Start(TaskScheduler.Default, () => + { + using (definition.Lifetime.UsingExecuteIfAlive()) + { + e1.Set(); + e2.WaitOne(); + } + }); + + var task = definition.TerminateAsync(); + var token = definition.Lifetime.ToCancellationToken(); + Assert.IsTrue(token.IsCancellationRequested); + e2.Set(); + Assert.IsTrue(task.AsTask().Wait(TimeSpan.FromSeconds(10))); + } + + { + var definition = new LifetimeDefinition(); + var e1 = new ManualResetEvent(false); + var e2 = new ManualResetEvent(false); + TestLifetime.Start(TaskScheduler.Default, () => + { + using (definition.Lifetime.UsingExecuteIfAlive()) + { + e1.Set(); + e2.WaitOne(); + } + }); + + var task = definition.TerminateAsync(); + e2.Set(); + Assert.IsTrue(task.AsTask().Wait(TimeSpan.FromSeconds(10))); + var token = definition.Lifetime.ToCancellationToken(); + Assert.IsTrue(token.IsCancellationRequested); + } + } + + [Test] + public void ConcurrentCancellationTokenAndAsyncTerminationTest() + { + for (int i = 0; i < 10000; i++) + { + var definition = new LifetimeDefinition(); + var e1 = new ManualResetEvent(false); + var e2 = new ManualResetEvent(false); + + var executionTask = TestLifetime.Start(TaskScheduler.Default, () => + { + using (definition.Lifetime.UsingExecuteIfAlive()) + { + e1.Set(); + e2.WaitOne(); + } + }); + + e1.WaitOne(); + + const int threadsCount = 10; + var threads = 0; + + var tasks = Enumerable.Range(0, threadsCount).Select(num => TestLifetime.StartAsync(TaskScheduler.Default, async () => + { + Interlocked.Increment(ref threads); + SpinWait.SpinUntil(() => Memory.VolatileRead(ref threads) == threadsCount); + + if (num % 2 == 0) + return definition.ToCancellationToken(); + + await definition.TerminateAsync(); + return definition.ToCancellationToken(); + })).ToArray(); + + e2.Set(); + + var whenAllTask = Task.WhenAll(tasks); + Assert.IsTrue(whenAllTask.Wait(TimeSpan.FromMinutes(1))); + + var tokens = whenAllTask.Result; + Assert.IsTrue(tokens.All(x => x.IsCancellationRequested)); + + executionTask.Wait(TimeSpan.FromSeconds(10)); + } + } + + [Test] + public void ConcurrentCancellationTokenAndTerminationTest() + { + for (int i = 0; i < 10000; i++) + { + var definition = new LifetimeDefinition(); + var e1 = new ManualResetEvent(false); + var e2 = new ManualResetEvent(false); + + var executionTask = TestLifetime.Start(TaskScheduler.Default, () => + { + using (definition.Lifetime.UsingExecuteIfAlive()) + { + e1.Set(); + e2.WaitOne(); + } + }); + + e1.WaitOne(); + + const int threadsCount = 10; + var threads = 0; + + var tasks = Enumerable.Range(0, threadsCount).Select(num => TestLifetime.Start(TaskScheduler.Default, () => + { + Interlocked.Increment(ref threads); + SpinWait.SpinUntil(() => Memory.VolatileRead(ref threads) == threadsCount); + + if (num % 2 == 0) + return definition.ToCancellationToken(); + + definition.Terminate(); + return definition.ToCancellationToken(); + })).ToArray(); + + e2.Set(); + + var whenAllTask = Task.WhenAll(tasks); + Assert.IsTrue(whenAllTask.Wait(TimeSpan.FromMinutes(1))); + + var tokens = whenAllTask.Result; + Assert.IsTrue(tokens.All(x => x.IsCancellationRequested)); + + executionTask.Wait(TimeSpan.FromSeconds(10)); + } + } + + + [Test] + public void ConcurrentToCancellationTokenTest() + { + for (int i = 0; i < 10000; i++) + { + var definition = new LifetimeDefinition(); + + const int threadsCount = 10; + var threads = 0; + + var tasks = Enumerable.Range(0, threadsCount).Select(num => TestLifetime.Start(TaskScheduler.Default, () => + { + Interlocked.Increment(ref threads); + SpinWait.SpinUntil(() => Memory.VolatileRead(ref threads) == threadsCount); + + return definition.ToCancellationToken(); + })).ToArray(); + + var whenAllTask = Task.WhenAll(tasks); + Assert.IsTrue(whenAllTask.Wait(TimeSpan.FromMinutes(1))); + + var tokens = whenAllTask.Result; + var singleToken = tokens.Distinct().Single(); + Assert.IsFalse(singleToken.IsCancellationRequested); + + if (i % 2 == 0) + Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully); + else + definition.Terminate(); + + Assert.IsTrue(singleToken.IsCancellationRequested); + } + } + + [Test] + public void TerminatedToCancellationToken() + { + var token = LifetimeDefinition.Terminated.ToCancellationToken(); + Assert.AreEqual(token, new CancellationToken(true)); + } + + [Test] + public void RecursiveTerminateAsync() + { + { + var definition = new LifetimeDefinition(); + var count = 0; + definition.Lifetime.OnTermination(() => count++); + definition.Lifetime.OnTermination(() => definition.Terminate()); + Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully); + Assert.AreEqual(1, count); + Assert.AreEqual(LifetimeStatus.Terminated, definition.Status); + } + + { + var definition = new LifetimeDefinition(); + var count = 0; + definition.Lifetime.OnTermination(() => count++); + definition.Lifetime.OnTermination(() => + { + Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully); + }); + + Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully); + Assert.AreEqual(1, count); + Assert.AreEqual(LifetimeStatus.Terminated, definition.Status); + } + + { + var definition = new LifetimeDefinition(); + var nested = definition.Lifetime.CreateNested(); + var count = 0; + definition.Lifetime.OnTermination(() => count++); + nested.Lifetime.OnTermination(() => + { + Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully); + }); + + Assert.IsTrue(definition.TerminateAsync().IsCompletedSuccessfully); + Assert.AreEqual(1, count); + Assert.AreEqual(LifetimeStatus.Terminated, definition.Status); + Assert.AreEqual(LifetimeStatus.Terminated, nested.Status); + } + } + + [Test] + public void NonAsyncDisposableTerminationTest() + { + var definition = new LifetimeDefinition(); + var disposable = new TestNonAsyncDisposable(); + definition.Lifetime.OnTermination(disposable); + Assert.IsFalse(disposable.Disposed); + + definition.Terminate(); + Assert.IsTrue(disposable.Disposed); + } + + private class TestNonAsyncDisposable : IDisposable, IAsyncDisposable + { + public bool Disposing { get; private set; } + public bool Disposed { get; private set; } + + public void Dispose() + { + Disposing = true; + Disposed = true; + } + + public ValueTask DisposeAsync() => throw new NotImplementedException(); // must not be called + } + + + private class TestAsyncDisposable : IDisposable, IAsyncDisposable + { + public bool Disposing { get; private set; } + public bool Disposed { get; private set; } + + public void Dispose() => throw new NotImplementedException(); // must not be called + + public async ValueTask DisposeAsync() + { + Disposing = true; + await Task.Yield(); + Disposed = true; + } + } #endif