Skip to content

Commit

Permalink
small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
timon committed Oct 11, 2023
1 parent 234fcee commit f39ed0d
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 8 deletions.
18 changes: 13 additions & 5 deletions MemoizR.Reactive/Reaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ internal Reaction(Func<Task> fn, Context context, TaskScheduler? sheduler = null
this.State = CacheState.CacheDirty;
this.label = label;

// The reaction must be initialized to build the Sources.
Update().GetAwaiter().GetResult();
_ = Init();
}

public void Pause()
Expand All @@ -31,6 +30,15 @@ public Task Resume()
return UpdateIfNecessary();
}

private async Task Init()
{
using (await context.contextLock.UpgradeableLockAsync())
{
// The reaction must be initialized to build the Sources.
await Update();
}
}

// Update the reaction if dirty, or a parent turns out to be dirty.
internal async Task UpdateIfNecessary()
{
Expand Down Expand Up @@ -77,7 +85,7 @@ private async Task Update()
State = CacheState.CacheDirty;
return;
}

// Evaluate the reactive function body, dynamically capturing any other reactives used.
var prevReaction = context.CurrentReaction;
var prevGets = context.CurrentGets;
Expand All @@ -91,7 +99,7 @@ private async Task Update()
{
if (!isPaused)
{
var t = sheduler != null
var t = sheduler != null && sheduler.Id != Thread.CurrentThread.ManagedThreadId
? Task.Factory.StartNew(async () => await fn(), CancellationToken.None, TaskCreationOptions.None, sheduler)
: fn();

Expand Down Expand Up @@ -122,7 +130,7 @@ private async Task Update()
var source = Sources[i];
if (!source.Observers.Any())
{
source.Observers = (new[] { this }).ToArray();
source.Observers = new[] { this };
}
else
{
Expand Down
2 changes: 1 addition & 1 deletion MemoizR.StructuredConcurrency/ConcurrentMapReduce.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private async Task Update()
var source = Sources[i];
if (!source.Observers.Any())
{
source.Observers = (new[] { this }).ToArray();
source.Observers = new[] { this };
}
else
{
Expand Down
2 changes: 1 addition & 1 deletion MemoizR.StructuredConcurrency/StructuredJobBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public async Task<T> Run()
AddConcurrentWork();
tasks = this.tasks;
}
await Task.WhenAll(tasks.ToArray());
await Task.WhenAll(tasks);
return result!;
}
catch
Expand Down
2 changes: 1 addition & 1 deletion MemoizR/MemoizR.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private async Task Update()
var source = Sources[i];
if (!source.Observers.Any())
{
source.Observers = (new[] { this }).ToArray();
source.Observers = new[] { this };
}
else
{
Expand Down

0 comments on commit f39ed0d

Please sign in to comment.