Skip to content

Commit

Permalink
Merge pull request #124 from computablee/bug-fixes
Browse files Browse the repository at this point in the history
Add checks for overflow in schedulers
  • Loading branch information
computablee authored Nov 26, 2023
2 parents 4c02a0b + 94200ed commit 2db30d0
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 85 deletions.
12 changes: 12 additions & 0 deletions DotMP-Tests/ParallelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,18 @@ public void Improper_taskwait_should_except()
});
}

/// <summary>
/// Ensures that overflows in the schedulers properly throw exceptions.
/// </summary>
[Fact]
public void Boundary_parallelfor_should_except()
{
Assert.Throws<DotMP.Exceptions.InternalSchedulerException>(() =>
{
DotMP.Parallel.ParallelFor(0, int.MaxValue, i => { });
});
}

/// <summary>
/// A sample workload for DotMP.Parallel.ParallelFor().
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion DotMP/DotMP.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<TargetFrameworks>net6.0;net7.0;net8.0</TargetFrameworks>
<RootNamespace>DotMP</RootNamespace>
<PackageId>DotMP</PackageId>
<Version>1.6.0-pre2</Version>
<Version>1.6.0</Version>
<Authors>Phillip Allen Lane,et al.</Authors>
<PackageDescription>A library for fork-join parallelism in .NET, with an OpenMP-like API.</PackageDescription>
<RepositoryUrl>https://github.com/computablee/DotMP</RepositoryUrl>
Expand Down
12 changes: 12 additions & 0 deletions DotMP/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,16 @@ public class ImproperTaskwaitUsageException : Exception
/// <param name="msg">The message to associate with the exception.</param>
public ImproperTaskwaitUsageException(string msg) : base(msg) { }
}

/// <summary>
/// Exception thrown if the internal schedulers encounter an overflow.
/// </summary>
public class InternalSchedulerException : Exception
{
/// <summary>
/// Constructor with a message.
/// </summary>
/// <param name="msg">The message to associate with the exception.</param>
public InternalSchedulerException(string msg) : base(msg) { }
}
}
160 changes: 94 additions & 66 deletions DotMP/Schedule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,18 @@ private struct IterWrapper
/// <param name="end">The end of the loop, exclusive.</param>
/// <param name="num_threads">The number of threads.</param>
/// <param name="chunk_size">The chunk size.</param>
/// <exception cref="OverflowException">Thrown if there's an internal scheduler overflow.</exception>
public override void LoopInit(int start, int end, uint num_threads, uint chunk_size)
{
this.chunk_size = chunk_size;
this.end = end;
advance_by = (int)(chunk_size * num_threads);
curr_iters = new IterWrapper[num_threads];
for (int i = 0; i < num_threads; i++)
curr_iters[i].curr_iter = start + ((int)chunk_size * i);
checked
{
this.chunk_size = chunk_size;
this.end = end;
advance_by = (int)(chunk_size * num_threads);
curr_iters = new IterWrapper[num_threads];
for (int i = 0; i < num_threads; i++)
curr_iters[i].curr_iter = start + ((int)chunk_size * i);
}
}

/// <summary>
Expand All @@ -220,11 +224,15 @@ public override void LoopInit(int start, int end, uint num_threads, uint chunk_s
/// <param name="thread_id">The thread ID.</param>
/// <param name="start">The start of the chunk, inclusive.</param>
/// <param name="end">The end of the chunk, exclusive.</param>
/// <exception cref="OverflowException">Thrown if there's an internal scheduler overflow.</exception>
public override void LoopNext(int thread_id, out int start, out int end)
{
start = curr_iters[thread_id].curr_iter;
end = Math.Min(start + (int)chunk_size, this.end);
curr_iters[thread_id].curr_iter += advance_by;
checked
{
start = curr_iters[thread_id].curr_iter;
end = Math.Min(start + (int)chunk_size, this.end);
curr_iters[thread_id].curr_iter += advance_by;
}
}
}

Expand Down Expand Up @@ -266,10 +274,14 @@ public override void LoopInit(int start, int end, uint num_threads, uint chunk_s
/// <param name="thread_id">The thread ID.</param>
/// <param name="start">The start of the chunk, inclusive.</param>
/// <param name="end">The end of the chunk, exclusive.</param>
/// <exception cref="OverflowException">Thrown if there's an internal scheduler overflow.</exception>
public override void LoopNext(int thread_id, out int start, out int end)
{
start = Interlocked.Add(ref this.start, (int)chunk_size) - (int)chunk_size;
end = Math.Min(start + (int)chunk_size, this.end);
checked
{
start = Interlocked.Add(ref this.start, (int)chunk_size) - (int)chunk_size;
end = Math.Min(start + (int)chunk_size, this.end);
}
}
}

Expand Down Expand Up @@ -321,19 +333,23 @@ public override void LoopInit(int start, int end, uint num_threads, uint chunk_s
/// <param name="thread_id">The thread ID.</param>
/// <param name="start">The start of the chunk, inclusive.</param>
/// <param name="end">The end of the chunk, exclusive.</param>
/// <exception cref="OverflowException">Thrown if there's an internal scheduler overflow.</exception>
public override void LoopNext(int thread_id, out int start, out int end)
{
int chunk_size;

lock (sched_lock)
checked
{
start = this.start;
chunk_size = (int)Math.Max(this.chunk_size, (this.end - start) / (num_threads * 2));
int chunk_size;

this.start += chunk_size;
}
lock (sched_lock)
{
start = this.start;
chunk_size = (int)Math.Max(this.chunk_size, (this.end - start) / (num_threads * 2));

this.start += chunk_size;
}

end = Math.Min(start + chunk_size, this.end);
end = Math.Min(start + chunk_size, this.end);
}
}
}

Expand Down Expand Up @@ -423,29 +439,33 @@ private struct Queue
/// <param name="end">The end of the loop, exclusive.</param>
/// <param name="num_threads">The number of threads.</param>
/// <param name="chunk_size">The chunk size.</param>
/// <exception cref="OverflowException">Thrown if there's an internal scheduler overflow.</exception>
public override void LoopInit(int start, int end, uint num_threads, uint chunk_size)
{
this.queues = new Queue[num_threads];
this.chunk_size = chunk_size;
this.threads_with_remaining_work = num_threads;
this.num_threads = num_threads;
checked
{
this.queues = new Queue[num_threads];
this.chunk_size = chunk_size;
this.threads_with_remaining_work = num_threads;
this.num_threads = num_threads;

int ctr = start;
int div = (end - start) / (int)num_threads;
int ctr = start;
int div = (end - start) / (int)num_threads;

for (int i = 0; i < num_threads - 1; i++)
{
queues[i].start = ctr;
ctr += div;
queues[i].end = ctr;
queues[i].work_remaining = true;
queues[i].qlock = new object();
}
for (int i = 0; i < num_threads - 1; i++)
{
queues[i].start = ctr;
ctr += div;
queues[i].end = ctr;
queues[i].work_remaining = true;
queues[i].qlock = new object();
}

queues[num_threads - 1].start = ctr;
queues[num_threads - 1].end = end;
queues[num_threads - 1].work_remaining = true;
queues[num_threads - 1].qlock = new object();
queues[num_threads - 1].start = ctr;
queues[num_threads - 1].end = end;
queues[num_threads - 1].work_remaining = true;
queues[num_threads - 1].qlock = new object();
}
}

/// <summary>
Expand All @@ -454,25 +474,29 @@ public override void LoopInit(int start, int end, uint num_threads, uint chunk_s
/// <param name="thread_id">The thread ID.</param>
/// <param name="start">The start of the chunk, inclusive.</param>
/// <param name="end">The end of the chunk, exclusive.</param>
/// <exception cref="OverflowException">Thrown if there's an internal scheduler overflow.</exception>
public override void LoopNext(int thread_id, out int start, out int end)
{
do
checked
{
lock (queues[thread_id].qlock)
do
{
start = queues[thread_id].start;
end = Math.Min(start + (int)chunk_size, queues[thread_id].end);

if (start < end)
lock (queues[thread_id].qlock)
{
queues[thread_id].start += (int)chunk_size;
return;
start = queues[thread_id].start;
end = Math.Min(start + (int)chunk_size, queues[thread_id].end);

if (start < end)
{
queues[thread_id].start += (int)chunk_size;
return;
}
}
}

StealHandler(thread_id);
StealHandler(thread_id);
}
while (threads_with_remaining_work > 0);
}
while (threads_with_remaining_work > 0);
}

/// <summary>
Expand All @@ -499,35 +523,39 @@ private void StealHandler(int thread_id)
/// </summary>
/// <param name="thread_id">The thread ID.</param>
/// <returns>Whether or not the steal was successful.</returns>
/// <exception cref="OverflowException">Thrown if there's an internal scheduler overflow.</exception>
private bool DoSteal(int thread_id)
{
int rng = Random.Shared.Next((int)num_threads);
int new_start, new_end;

lock (queues[rng].qlock)
checked
{
if (queues[rng].start < queues[rng].end)
int rng = Random.Shared.Next((int)num_threads);
int new_start, new_end;

lock (queues[rng].qlock)
{
int steal_size = (queues[rng].end - queues[rng].start + 1) / 2;
if (queues[rng].start < queues[rng].end)
{
int steal_size = (queues[rng].end - queues[rng].start + 1) / 2;

new_start = queues[rng].start;
new_end = queues[rng].start + steal_size;
new_start = queues[rng].start;
new_end = queues[rng].start + steal_size;

queues[rng].start = new_end;
queues[rng].start = new_end;
}
else
{
return false;
}
}
else

lock (queues[thread_id].qlock)
{
return false;
queues[thread_id].start = new_start;
queues[thread_id].end = new_end;
}
}

lock (queues[thread_id].qlock)
{
queues[thread_id].start = new_start;
queues[thread_id].end = new_end;
return true;
}

return true;
}
}
}
Expand Down
31 changes: 20 additions & 11 deletions DotMP/WorkShare.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@
* This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser
* General Public License as published by the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
* implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with this library; if not,
* write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/

using System.Collections.Generic;
using System.Threading;
using System;
using DotMP.Exceptions;

namespace DotMP
{
Expand Down Expand Up @@ -245,6 +246,7 @@ internal void SetLocal<T>(ref T local)
/// </summary>
/// <typeparam name="T">The type of reductions, if applicable.</typeparam>
/// <param name="forAction">The function to be executed.</param>
/// <exception cref="InternalSchedulerException">Thrown if the internal schedulers throw an exception.</exception>
internal void PerformLoop<T>(ForAction<T> forAction)
{
int start = this.start;
Expand All @@ -258,19 +260,26 @@ internal void PerformLoop<T>(ForAction<T> forAction)
if (forAction.IsReduction)
SetLocal(ref local);

Parallel.Master(() => scheduler.LoopInit(start, end, num_threads, chunk_size));
Parallel.Barrier();
try
{
Parallel.Master(() => scheduler.LoopInit(start, end, num_threads, chunk_size));
Parallel.Barrier();

int chunk_start, chunk_end;
ref int curr_iter = ref working_iters[thread_id];
int chunk_start, chunk_end;
ref int curr_iter = ref working_iters[thread_id];

do
do
{
scheduler.LoopNext(thread_id, out chunk_start, out chunk_end);
if (chunk_start < chunk_end)
forAction.PerformLoop(ref curr_iter, chunk_start, chunk_end, ref local);
}
while (chunk_start < chunk_end);
}
catch (OverflowException)
{
scheduler.LoopNext(thread_id, out chunk_start, out chunk_end);
if (chunk_start < chunk_end)
forAction.PerformLoop(ref curr_iter, chunk_start, chunk_end, ref local);
throw new InternalSchedulerException(string.Format("An internal overflow exception has occurred within the loop scheduler. This most often happens when the upper bound of the loop is too close to {0}.", int.MaxValue));
}
while (chunk_start < chunk_end);

if (forAction.IsReduction)
AddReductionValue(local);
Expand Down
10 changes: 3 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ examples-seq:
$(DN) build -c $(BUILD) examples/Serial/KNN

tests:
$(DN) build -c $(BUILD) DotMP-Tests
$(DN) build -c Debug DotMP-Tests

test:
$(DN) test -c $(BUILD) -l "console;verbosity=detailed" -p:CollectCoverage=true -p:CoverletOutputFormat=opencover DotMP-Tests
$(DN) test -c Debug -l "console;verbosity=detailed" -p:CollectCoverage=true -p:CoverletOutputFormat=opencover DotMP-Tests

build:
$(DN) build -c $(BUILD) -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg DotMP
Expand All @@ -42,11 +42,7 @@ docs: ProcessedREADME.md
doxygen

pack: ProcessedREADME.md build
$(DN) pack -c $(BUILD) DotMP
cp ./DotMP/bin/Release/net6.0/DotMP.dll ./DotMP/bin/Release/DotMP-NET6.0.dll
cp ./DotMP/bin/Release/net7.0/DotMP.dll ./DotMP/bin/Release/DotMP-NET7.0.dll
cp ./DotMP/bin/Release/net6.0/DotMP.pdb ./DotMP/bin/Release/DotMP-NET6.0.pdb
cp ./DotMP/bin/Release/net7.0/DotMP.pdb ./DotMP/bin/Release/DotMP-NET7.0.pdb
$(DN) pack -c $(BUILD) /p:ContinuousIntegrationBuild=true DotMP

clean:
rm -f ProcessedREADME.md
Expand Down

0 comments on commit 2db30d0

Please sign in to comment.