Skip to content

Commit

Permalink
Merge pull request #122 from computablee/bug-fixes
Browse files Browse the repository at this point in the history
Allow taskwait from within tasks
  • Loading branch information
computablee authored Nov 25, 2023
2 parents aa833ba + 4203bdd commit 4c02a0b
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 24 deletions.
74 changes: 72 additions & 2 deletions DotMP-Tests/ParallelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime;
using System.Threading;
using DotMP;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace DotMPTests
{
Expand All @@ -30,6 +32,17 @@ namespace DotMPTests
/// </summary>
public class ParallelTests
{
private readonly ITestOutputHelper output;

/// <summary>
/// Constructor to write output.
/// </summary>
/// <param name="output">Output object.</param>
public ParallelTests(ITestOutputHelper output)
{
this.output = output;
}

/// <summary>
/// Tests to make sure that parallel performance is higher than sequential performance.
/// </summary>
Expand Down Expand Up @@ -1091,10 +1104,11 @@ public void Tasking_works()
{
uint threads = 6;
int sleep_duration = 100;
double start = DotMP.Parallel.GetWTime();
int[] tasks_thread_executed = new int[threads];
int total_tasks_executed = 0;

double start = DotMP.Parallel.GetWTime();

DotMP.Parallel.ParallelRegion(num_threads: threads, action: () =>
{
DotMP.Parallel.Single(0, () =>
Expand All @@ -1119,7 +1133,6 @@ public void Tasking_works()
i.Should().Be(2);
}
elapsed.Should().BeGreaterThan(2.0 * (sleep_duration / 1000.0));
elapsed.Should().BeLessThan(1.3 * 2.0 * (sleep_duration / 1000.0));

tasks_thread_executed = new int[threads];
int tasks_to_spawn = 100_000;
Expand Down Expand Up @@ -1539,6 +1552,63 @@ public void Custom_scheduler_works()
});
}

/// <summary>
/// Checks that nested taskwait works.
/// </summary>
[Fact]
public void Nested_taskwait_works()
{
int prog = 0;

DotMP.Parallel.ParallelRegion(num_threads: 4, action: () =>
{
DotMP.Parallel.Master(() =>
{
DotMP.Parallel.Task(() =>
{
DotMP.Atomic.Inc(ref prog).Should().Be(5);

var child1 = DotMP.Parallel.Task(() =>
{
Thread.Sleep(1000);
DotMP.Atomic.Inc(ref prog).Should().BeInRange(6, 7);
});

var child2 = DotMP.Parallel.Task(() =>
{
Thread.Sleep(1000);
DotMP.Atomic.Inc(ref prog).Should().BeInRange(6, 7);
});

DotMP.Parallel.Taskwait(child1, child2);

DotMP.Atomic.Inc(ref prog).Should().Be(8);
});
});

DotMP.Atomic.Inc(ref prog).Should().BeLessThanOrEqualTo(4);

DotMP.Parallel.Taskwait();

DotMP.Atomic.Inc(ref prog).Should().BeGreaterThan(8);
});
}

/// <summary>
/// Ensures that improper usage of taskwait that risks deadlock should throw an exception.
/// </summary>
[Fact]
public void Improper_taskwait_should_except()
{
Assert.Throws<DotMP.Exceptions.ImproperTaskwaitUsageException>(() =>
{
DotMP.Parallel.ParallelMaster(() =>
{
DotMP.Parallel.Task(() => DotMP.Parallel.Taskwait());
});
});
}

/// <summary>
/// A sample workload for DotMP.Parallel.ParallelFor().
/// </summary>
Expand Down
14 changes: 12 additions & 2 deletions DotMP/DependencyGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
* This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser
* General Public License as published by the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
* implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this library; if not,
* write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
Expand Down Expand Up @@ -171,5 +171,15 @@ public void Dispose()
{
rw_lock.Dispose();
}

/// <summary>
/// Determines if a task has been completed.
/// </summary>
/// <param name="id">The ID of the task to check completion.</param>
/// <returns>Whether or not the task has been completed.</returns>
internal bool TaskIsComplete(T id)
{
return completed.ContainsKey(id);
}
}
}
12 changes: 12 additions & 0 deletions DotMP/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,16 @@ public class TooManyIterationsException : Exception
/// <param name="msg">The message to associate with the exception.</param>
public TooManyIterationsException(string msg) : base(msg) { }
}

/// <summary>
/// Exception thrown if the wrong taskwait overload was used from within a task.
/// </summary>
public class ImproperTaskwaitUsageException : Exception
{
/// <summary>
/// Constructor with a message.
/// </summary>
/// <param name="msg">The message to associate with the exception.</param>
public ImproperTaskwaitUsageException(string msg) : base(msg) { }
}
}
13 changes: 8 additions & 5 deletions DotMP/ForkedRegion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
* This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser
* General Public License as published by the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
* implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this library; if not,
* write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
Expand Down Expand Up @@ -65,10 +65,12 @@ internal Thread CreateThread(Action omp_fn, int tid, uint num_threads)
catch (Exception ex)
{
this.ex ??= ex;
Parallel.canceled = true;

for (int i = 0; i < num_threads; i++)
if (i != tid)
threads[i].Interrupt();
if (ex is not ThreadInterruptedException)
for (int i = 0; i < num_threads; i++)
if (i != tid)
threads[i].Interrupt();
}
});
}
Expand Down Expand Up @@ -170,6 +172,7 @@ internal ForkedRegion(uint num_threads, Action omp_fn)
internal void StartThreadpool()
{
in_parallel = true;
Parallel.canceled = false;

for (int i = 0; i < reg.num_threads; i++)
reg.threads[i].Start();
Expand Down
79 changes: 66 additions & 13 deletions DotMP/Parallel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ public static class Parallel
/// Current thread num, cached.
/// </summary>
private static ThreadLocal<int> thread_num = new ThreadLocal<int>(() => Convert.ToInt32(Thread.CurrentThread.Name));
/// <summary>
/// The level of task nesting, to determine when to enact barriers and reset the DAG.
/// </summary>
private static ThreadLocal<uint> task_nesting = new ThreadLocal<uint>(() => 0);
/// <summary>
/// Determines if the current threadpool has been marked to terminate.
/// </summary>
internal static volatile bool canceled = false;

/// <summary>
/// Fixes the arguments for a parallel for loop.
Expand Down Expand Up @@ -597,10 +605,22 @@ public static void ParallelRegion(Action action, uint? num_threads = null)
num_threads ??= Parallel.num_threads;

ForkedRegion freg = new ForkedRegion(num_threads.Value, action);

if (barrier is not null) barrier.Dispose();
barrier = new Barrier((int)num_threads.Value);

task_nesting.Dispose();
task_nesting = new ThreadLocal<uint>(() => 0);

TaskingContainer tc = new TaskingContainer();
tc.ResetDAGNotThreadSafe();

freg.StartThreadpool();

freg.reg.num_threads = 1;

single_thread.Clear();
barrier.Dispose();
barrier = new Barrier(1);
}

Expand Down Expand Up @@ -887,32 +907,65 @@ public static TaskUUID Task(Action action, params TaskUUID[] depends)
}

/// <summary>
/// Wait for all tasks in the queue to complete.
/// Is injected into a Thread's work by the Region constructor, but can also be called manually.
/// The injection is done to ensure that Parallel.Taskwait() is called before a Parallel.ParallelRegion() terminates,
/// guaranteeing all tasks submitted complete.
/// Acts as an implicit Barrier().
/// Wait for selected tasks in the queue to complete, or for the full queue to empty if no tasks are specified.
/// Acts as an implicit Barrier() if it is not called from within a task.
/// </summary>
public static void Taskwait()
/// <param name="tasks">The tasks to wait on.</param>
/// <exception cref="ImproperTaskwaitUsageException">Thrown if a parameter-less taskwait is called from within a thread, which leads to deadlock.</exception>
public static void Taskwait(params TaskUUID[] tasks)
{
Func<int, bool> check;
ForkedRegion fr = new ForkedRegion();
TaskingContainer tc = new TaskingContainer();
int tasks_remaining;
int tasks_completed = 0;

Barrier();
if (tasks.Length == 0)
{
if (task_nesting.Value > 0)
throw new ImproperTaskwaitUsageException("Using the default taskwait from within a task will result in a deadlock. Try specifying the task to wait on as an argument.");

check = tr => tr > 0;
}
else
{
check = _ =>
{
while (tasks_completed < tasks.Length && tc.TaskIsComplete(tasks[tasks_completed].GetUUID()))
++tasks_completed;

return tasks_completed != tasks.Length;
};
}

if (task_nesting.Value == 0)
Barrier();

++task_nesting.Value;

do
{
if (canceled)
throw new ThreadInterruptedException();

do if (tc.GetNextTask(out Action do_action, out ulong uuid, out tasks_remaining))
if (tc.GetNextTask(out Action do_action, out ulong uuid, out tasks_remaining))
{
do_action();
tc.CompleteTask(uuid);
}
while (tasks_remaining > 0);

Barrier();
}
while (check(tasks_remaining));

tc.ResetDAG();
if (--task_nesting.Value == 0)
{
Barrier();

Barrier();
if (tasks_remaining == 0)
{
tc.ResetDAG();
Barrier();
}
}
}

/// <summary>
Expand Down
25 changes: 23 additions & 2 deletions DotMP/Tasking.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
* This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser
* General Public License as published by the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
* implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this library; if not,
* write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
Expand Down Expand Up @@ -50,6 +50,17 @@ internal void ResetDAG()
});
}

/// <summary>
/// Resets the DAG to a default state.
/// Allows the garbage collector to collect unused data.
/// Unlike ResetDAG(), this version is not thread-safe!
/// </summary>
internal void ResetDAGNotThreadSafe()
{
dag.Dispose();
dag = new DAG<ulong, Action>();
}

/// <summary>
/// Gets the next task from the queue.
/// </summary>
Expand Down Expand Up @@ -105,6 +116,16 @@ internal void CompleteTask(ulong uuid)
{
dag.CompleteItem(uuid);
}

/// <summary>
/// Determines if a task has been completed.
/// </summary>
/// <param name="uuid">The ID of the task to check completion.</param>
/// <returns>Whether or not the task has been completed.</returns>
internal bool TaskIsComplete(ulong uuid)
{
return dag.TaskIsComplete(uuid);
}
}

/// <summary>
Expand Down

0 comments on commit 4c02a0b

Please sign in to comment.