diff --git a/DotMP-Tests/ParallelTests.cs b/DotMP-Tests/ParallelTests.cs index e00ed4b3..72225c83 100644 --- a/DotMP-Tests/ParallelTests.cs +++ b/DotMP-Tests/ParallelTests.cs @@ -18,10 +18,12 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Runtime; using System.Threading; using DotMP; using FluentAssertions; using Xunit; +using Xunit.Abstractions; namespace DotMPTests { @@ -30,6 +32,17 @@ namespace DotMPTests /// public class ParallelTests { + private readonly ITestOutputHelper output; + + /// + /// Constructor to write output. + /// + /// Output object. + public ParallelTests(ITestOutputHelper output) + { + this.output = output; + } + /// /// Tests to make sure that parallel performance is higher than sequential performance. /// @@ -1091,10 +1104,11 @@ public void Tasking_works() { uint threads = 6; int sleep_duration = 100; - double start = DotMP.Parallel.GetWTime(); int[] tasks_thread_executed = new int[threads]; int total_tasks_executed = 0; + double start = DotMP.Parallel.GetWTime(); + DotMP.Parallel.ParallelRegion(num_threads: threads, action: () => { DotMP.Parallel.Single(0, () => @@ -1119,7 +1133,6 @@ public void Tasking_works() i.Should().Be(2); } elapsed.Should().BeGreaterThan(2.0 * (sleep_duration / 1000.0)); - elapsed.Should().BeLessThan(1.3 * 2.0 * (sleep_duration / 1000.0)); tasks_thread_executed = new int[threads]; int tasks_to_spawn = 100_000; @@ -1539,6 +1552,63 @@ public void Custom_scheduler_works() }); } + /// + /// Checks that nested taskwait works. + /// + [Fact] + public void Nested_taskwait_works() + { + int prog = 0; + + DotMP.Parallel.ParallelRegion(num_threads: 4, action: () => + { + DotMP.Parallel.Master(() => + { + DotMP.Parallel.Task(() => + { + DotMP.Atomic.Inc(ref prog).Should().Be(5); + + var child1 = DotMP.Parallel.Task(() => + { + Thread.Sleep(1000); + DotMP.Atomic.Inc(ref prog).Should().BeInRange(6, 7); + }); + + var child2 = DotMP.Parallel.Task(() => + { + Thread.Sleep(1000); + DotMP.Atomic.Inc(ref prog).Should().BeInRange(6, 7); + }); + + DotMP.Parallel.Taskwait(child1, child2); + + DotMP.Atomic.Inc(ref prog).Should().Be(8); + }); + }); + + DotMP.Atomic.Inc(ref prog).Should().BeLessThanOrEqualTo(4); + + DotMP.Parallel.Taskwait(); + + DotMP.Atomic.Inc(ref prog).Should().BeGreaterThan(8); + }); + } + + /// + /// Ensures that improper usage of taskwait that risks deadlock should throw an exception. + /// + [Fact] + public void Improper_taskwait_should_except() + { + Assert.Throws(() => + { + DotMP.Parallel.ParallelMaster(() => + { + DotMP.Parallel.Task(() => DotMP.Parallel.Taskwait()); + }); + }); + } + /// /// A sample workload for DotMP.Parallel.ParallelFor(). /// diff --git a/DotMP/DependencyGraph.cs b/DotMP/DependencyGraph.cs index 2a9cff84..47f3a48b 100644 --- a/DotMP/DependencyGraph.cs +++ b/DotMP/DependencyGraph.cs @@ -5,11 +5,11 @@ * This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser * General Public License as published by the Free Software Foundation; either version 2.1 of the License, or * (at your option) any later version. - + * * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the * implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public * License for more details. - + * * You should have received a copy of the GNU Lesser General Public License along with this library; if not, * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ @@ -171,5 +171,15 @@ public void Dispose() { rw_lock.Dispose(); } + + /// + /// Determines if a task has been completed. + /// + /// The ID of the task to check completion. + /// Whether or not the task has been completed. + internal bool TaskIsComplete(T id) + { + return completed.ContainsKey(id); + } } } diff --git a/DotMP/Exceptions.cs b/DotMP/Exceptions.cs index ea5862a6..bfa29f71 100644 --- a/DotMP/Exceptions.cs +++ b/DotMP/Exceptions.cs @@ -77,4 +77,16 @@ public class TooManyIterationsException : Exception /// The message to associate with the exception. public TooManyIterationsException(string msg) : base(msg) { } } + + /// + /// Exception thrown if the wrong taskwait overload was used from within a task. + /// + public class ImproperTaskwaitUsageException : Exception + { + /// + /// Constructor with a message. + /// + /// The message to associate with the exception. + public ImproperTaskwaitUsageException(string msg) : base(msg) { } + } } diff --git a/DotMP/ForkedRegion.cs b/DotMP/ForkedRegion.cs index bbefed5e..f07f9094 100644 --- a/DotMP/ForkedRegion.cs +++ b/DotMP/ForkedRegion.cs @@ -5,11 +5,11 @@ * This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser * General Public License as published by the Free Software Foundation; either version 2.1 of the License, or * (at your option) any later version. - + * * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the * implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public * License for more details. - + * * You should have received a copy of the GNU Lesser General Public License along with this library; if not, * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ @@ -65,10 +65,12 @@ internal Thread CreateThread(Action omp_fn, int tid, uint num_threads) catch (Exception ex) { this.ex ??= ex; + Parallel.canceled = true; - for (int i = 0; i < num_threads; i++) - if (i != tid) - threads[i].Interrupt(); + if (ex is not ThreadInterruptedException) + for (int i = 0; i < num_threads; i++) + if (i != tid) + threads[i].Interrupt(); } }); } @@ -170,6 +172,7 @@ internal ForkedRegion(uint num_threads, Action omp_fn) internal void StartThreadpool() { in_parallel = true; + Parallel.canceled = false; for (int i = 0; i < reg.num_threads; i++) reg.threads[i].Start(); diff --git a/DotMP/Parallel.cs b/DotMP/Parallel.cs index 64b6d51e..a7f4f8c9 100644 --- a/DotMP/Parallel.cs +++ b/DotMP/Parallel.cs @@ -53,6 +53,14 @@ public static class Parallel /// Current thread num, cached. /// private static ThreadLocal thread_num = new ThreadLocal(() => Convert.ToInt32(Thread.CurrentThread.Name)); + /// + /// The level of task nesting, to determine when to enact barriers and reset the DAG. + /// + private static ThreadLocal task_nesting = new ThreadLocal(() => 0); + /// + /// Determines if the current threadpool has been marked to terminate. + /// + internal static volatile bool canceled = false; /// /// Fixes the arguments for a parallel for loop. @@ -597,10 +605,22 @@ public static void ParallelRegion(Action action, uint? num_threads = null) num_threads ??= Parallel.num_threads; ForkedRegion freg = new ForkedRegion(num_threads.Value, action); + + if (barrier is not null) barrier.Dispose(); barrier = new Barrier((int)num_threads.Value); + + task_nesting.Dispose(); + task_nesting = new ThreadLocal(() => 0); + + TaskingContainer tc = new TaskingContainer(); + tc.ResetDAGNotThreadSafe(); + freg.StartThreadpool(); + freg.reg.num_threads = 1; + single_thread.Clear(); + barrier.Dispose(); barrier = new Barrier(1); } @@ -887,32 +907,65 @@ public static TaskUUID Task(Action action, params TaskUUID[] depends) } /// - /// Wait for all tasks in the queue to complete. - /// Is injected into a Thread's work by the Region constructor, but can also be called manually. - /// The injection is done to ensure that Parallel.Taskwait() is called before a Parallel.ParallelRegion() terminates, - /// guaranteeing all tasks submitted complete. - /// Acts as an implicit Barrier(). + /// Wait for selected tasks in the queue to complete, or for the full queue to empty if no tasks are specified. + /// Acts as an implicit Barrier() if it is not called from within a task. /// - public static void Taskwait() + /// The tasks to wait on. + /// Thrown if a parameter-less taskwait is called from within a thread, which leads to deadlock. + public static void Taskwait(params TaskUUID[] tasks) { + Func check; ForkedRegion fr = new ForkedRegion(); TaskingContainer tc = new TaskingContainer(); int tasks_remaining; + int tasks_completed = 0; - Barrier(); + if (tasks.Length == 0) + { + if (task_nesting.Value > 0) + throw new ImproperTaskwaitUsageException("Using the default taskwait from within a task will result in a deadlock. Try specifying the task to wait on as an argument."); + + check = tr => tr > 0; + } + else + { + check = _ => + { + while (tasks_completed < tasks.Length && tc.TaskIsComplete(tasks[tasks_completed].GetUUID())) + ++tasks_completed; + + return tasks_completed != tasks.Length; + }; + } + + if (task_nesting.Value == 0) + Barrier(); + + ++task_nesting.Value; + + do + { + if (canceled) + throw new ThreadInterruptedException(); - do if (tc.GetNextTask(out Action do_action, out ulong uuid, out tasks_remaining)) + if (tc.GetNextTask(out Action do_action, out ulong uuid, out tasks_remaining)) { do_action(); tc.CompleteTask(uuid); } - while (tasks_remaining > 0); - - Barrier(); + } + while (check(tasks_remaining)); - tc.ResetDAG(); + if (--task_nesting.Value == 0) + { + Barrier(); - Barrier(); + if (tasks_remaining == 0) + { + tc.ResetDAG(); + Barrier(); + } + } } /// diff --git a/DotMP/Tasking.cs b/DotMP/Tasking.cs index 81fcd615..db2aa23d 100644 --- a/DotMP/Tasking.cs +++ b/DotMP/Tasking.cs @@ -5,11 +5,11 @@ * This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser * General Public License as published by the Free Software Foundation; either version 2.1 of the License, or * (at your option) any later version. - + * * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the * implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public * License for more details. - + * * You should have received a copy of the GNU Lesser General Public License along with this library; if not, * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ @@ -50,6 +50,17 @@ internal void ResetDAG() }); } + /// + /// Resets the DAG to a default state. + /// Allows the garbage collector to collect unused data. + /// Unlike ResetDAG(), this version is not thread-safe! + /// + internal void ResetDAGNotThreadSafe() + { + dag.Dispose(); + dag = new DAG(); + } + /// /// Gets the next task from the queue. /// @@ -105,6 +116,16 @@ internal void CompleteTask(ulong uuid) { dag.CompleteItem(uuid); } + + /// + /// Determines if a task has been completed. + /// + /// The ID of the task to check completion. + /// Whether or not the task has been completed. + internal bool TaskIsComplete(ulong uuid) + { + return dag.TaskIsComplete(uuid); + } } ///