Skip to content

Commit

Permalink
Internal deadlock avoidance strategies.
Browse files Browse the repository at this point in the history
  • Loading branch information
NTDLS committed Jan 4, 2025
1 parent 41782e4 commit e0b5ee4
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 46 deletions.
2 changes: 1 addition & 1 deletion NTDLS.Katzebase.Api/NTDLS.Katzebase.Api.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="NTDLS.Helpers" Version="1.3.11" />
<PackageReference Include="NTDLS.ReliableMessaging" Version="1.11.5" />
<PackageReference Include="NTDLS.ReliableMessaging" Version="1.11.6" />
</ItemGroup>

<ItemGroup>
Expand Down
52 changes: 26 additions & 26 deletions NTDLS.Katzebase.Engine/Atomicity/Transaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ public TransactionSnapshot Snapshot()
IsCancelled = IsCancelled
};

GrantedLockCache.Read((obj) => { snapshot.GrantedLockCache = new HashSet<string>(obj); });
BlockedByKeys.Read((obj) => { snapshot.BlockedByKeys = obj.Select(o => o.Snapshot()).ToList(); });
HeldLockKeys.Read((obj) => { snapshot.HeldLockKeys = obj.Select(o => o.Snapshot()).ToList(); });
TemporarySchemas.Read((obj) => { snapshot.TemporarySchemas = new HashSet<string>(obj); });
FilesReadForCache.Read((obj) => { snapshot.FilesReadForCache = new HashSet<string>(obj); });
DeferredIOs.Read((obj) => { snapshot.DeferredIOs = obj.Snapshot(); });
Atoms.Read((obj) => { snapshot.Atoms = obj.Select(o => o.Snapshot()).ToList(); });
GrantedLockCache.DeadlockAvoidanceTryRead(10, () => _core.IsRunning, (obj) => { snapshot.GrantedLockCache = new HashSet<string>(obj); });
BlockedByKeys.DeadlockAvoidanceTryRead(10, () => _core.IsRunning, (obj) => { snapshot.BlockedByKeys = obj.Select(o => o.Snapshot()).ToList(); });
HeldLockKeys.DeadlockAvoidanceTryRead(10, () => _core.IsRunning, (obj) => { snapshot.HeldLockKeys = obj.Select(o => o.Snapshot()).ToList(); });
TemporarySchemas.DeadlockAvoidanceTryRead(10, () => _core.IsRunning, (obj) => { snapshot.TemporarySchemas = new HashSet<string>(obj); });
FilesReadForCache.DeadlockAvoidanceTryRead(10, () => _core.IsRunning, (obj) => { snapshot.FilesReadForCache = new HashSet<string>(obj); });
DeferredIOs.DeadlockAvoidanceTryRead(10, () => _core.IsRunning, (obj) => { snapshot.DeferredIOs = obj.Snapshot(); });
Atoms.DeadlockAvoidanceTryRead(10, () => _core.IsRunning, (obj) => { snapshot.Atoms = obj.Select(o => o.Snapshot()).ToList(); });

return snapshot;
}
Expand Down Expand Up @@ -302,9 +302,9 @@ public void SetDeadlocked()

private void ReleaseLocks()
{
GrantedLockCache.Write((obj) => obj.Clear());
GrantedLockCache.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.Clear());

HeldLockKeys.Write((obj) =>
HeldLockKeys.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) =>
{
foreach (var key in obj)
{
Expand All @@ -315,9 +315,9 @@ private void ReleaseLocks()

internal void ReleaseLock(ObjectLockKey objectLock)
{
GrantedLockCache.Write((obj) => obj.Remove(objectLock.Key));
GrantedLockCache.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.Remove(objectLock.Key));

HeldLockKeys.Write((obj) =>
HeldLockKeys.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) =>
{
obj.Remove(objectLock);
});
Expand Down Expand Up @@ -479,7 +479,7 @@ public Transaction(EngineCore core, TransactionManager transactionManager, ulong
StartTime = DateTime.UtcNow;
ProcessId = processId;

DeferredIOs.Write((obj) =>
DeferredIOs.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) =>
{
obj.SetCore(core);
});
Expand Down Expand Up @@ -507,7 +507,7 @@ private bool IsFileAlreadyRecorded(string filePath)
{
bool result = false;

Atoms.Read((obj) =>
Atoms.DeadlockAvoidanceTryRead(10, () => _core.IsRunning, (obj) =>
{
result = obj.Exists(o => o.Key.Is(filePath));
});
Expand All @@ -525,7 +525,7 @@ public void RecordFileCreate(string filePath)

var ptRecording = Instrumentation?.CreateToken(InstrumentationTracker.PerformanceCounter.AtomRecording);

Atoms.Write((obj) =>
Atoms.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) =>
{
if (IsFileAlreadyRecorded(filePath))
{
Expand Down Expand Up @@ -560,7 +560,7 @@ public void RecordDirectoryCreate(string path)
EnsureActive();

var ptRecording = Instrumentation?.CreateToken(InstrumentationTracker.PerformanceCounter.AtomRecording);
Atoms.Write((obj) =>
Atoms.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) =>
{
if (IsFileAlreadyRecorded(path))
{
Expand Down Expand Up @@ -595,9 +595,9 @@ public void RecordPathDelete(string diskPath)

var ptRecording = Instrumentation?.CreateToken(InstrumentationTracker.PerformanceCounter.AtomRecording);

DeferredIOs.Write((obj) => obj.RemoveItemsWithPrefix(diskPath));
DeferredIOs.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.RemoveItemsWithPrefix(diskPath));

Atoms.Write((obj) =>
Atoms.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) =>
{
if (IsFileAlreadyRecorded(diskPath))
{
Expand Down Expand Up @@ -637,9 +637,9 @@ public void RecordFileDelete(string filePath)

var ptRecording = Instrumentation?.CreateToken(InstrumentationTracker.PerformanceCounter.AtomRecording);

DeferredIOs.Write((obj) => obj.Remove(filePath));
DeferredIOs.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.Remove(filePath));

Atoms.Write((obj) =>
Atoms.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) =>
{
if (IsFileAlreadyRecorded(filePath))
{
Expand Down Expand Up @@ -678,7 +678,7 @@ public void RecordFileRead(string filePath)

var ptRecording = Instrumentation?.CreateToken(InstrumentationTracker.PerformanceCounter.AtomRecording);

FilesReadForCache.WriteNullable((obj) => obj.Add(filePath));
FilesReadForCache.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.Add(filePath));

ptRecording?.StopAndAccumulate();
}
Expand All @@ -699,7 +699,7 @@ public void RecordFileAlter(string filePath)

var ptRecording = Instrumentation?.CreateToken(InstrumentationTracker.PerformanceCounter.AtomRecording);

Atoms.Write((obj) =>
Atoms.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) =>
{
if (IsFileAlreadyRecorded(filePath))
{
Expand Down Expand Up @@ -755,7 +755,7 @@ public void Rollback()
var ptRollback = Instrumentation?.CreateToken(InstrumentationTracker.PerformanceCounter.Rollback);
try
{
Atoms.Write((obj) =>
Atoms.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) =>
{
var rollbackActions = obj.OrderByDescending(o => o.Sequence);

Expand Down Expand Up @@ -799,7 +799,7 @@ public void Rollback()
}
}

FilesReadForCache.Write((obj) =>
FilesReadForCache.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) =>
{
foreach (var file in obj)
{
Expand Down Expand Up @@ -874,7 +874,7 @@ public bool Commit()

try
{
DeferredIOs.Write((obj) => obj.CommitDeferredDiskIO());
DeferredIOs.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.CommitDeferredDiskIO());
CleanupTransaction();
_transactionManager.RemoveByProcessId(ProcessId);
DeleteTemporarySchemas();
Expand Down Expand Up @@ -909,7 +909,7 @@ private void DeleteTemporarySchemas()
{
_core.EnsureNotNull();

TemporarySchemas.Write((obj) =>
TemporarySchemas.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) =>
{
if (obj.Count != 0)
{
Expand All @@ -936,7 +936,7 @@ private void CleanupTransaction()
_transactionLogHandle = null;
}

Atoms.Write((obj) =>
Atoms.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) =>
{
foreach (var record in obj)
{
Expand Down
5 changes: 5 additions & 0 deletions NTDLS.Katzebase.Engine/EngineCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace NTDLS.Katzebase.Engine
{
public class EngineCore
{
public bool IsRunning { get; private set; }
internal IOManager IO;
internal LockManager Locking;
internal CacheManager Cache;
Expand Down Expand Up @@ -116,10 +117,14 @@ public void Start()
LogManager.Information("Recovery complete.");

Heartbeat.Start();

IsRunning = true;
}

public void Stop()
{
IsRunning = false;

LogManager.Information("Stopping heartbeat pool.");
Heartbeat.Stop();

Expand Down
34 changes: 17 additions & 17 deletions NTDLS.Katzebase.Engine/Interactions/Management/LockManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ internal void Release(ObjectLock objectLock)
{
try
{
_collection.Write((obj) => obj.Remove(objectLock));
_collection.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.Remove(objectLock));
}
catch (Exception ex)
{
Expand Down Expand Up @@ -141,7 +141,7 @@ internal Dictionary<TransactionSnapshot, ObjectLockIntention> SnapshotWaitingTra

//Record that we are waiting on the grant. This is used for deadlock detection.
var ptPendingGrantLock = transaction.Instrumentation?.CreateToken(InstrumentationTracker.PerformanceCounter.PendingGrantLock, "Write");
_pendingGrants.Write((pendingGrants) =>
_pendingGrants.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (pendingGrants) =>
{
ptPendingGrantLock?.StopAndAccumulate();
pendingGrants.Add(pendingGrantKey, new(transaction, intention));
Expand Down Expand Up @@ -181,7 +181,7 @@ internal Dictionary<TransactionSnapshot, ObjectLockIntention> SnapshotWaitingTra
{
//We got a lock, record it and return the key to the caller.
var ptGrantedLockCacheWrite = transaction.Instrumentation?.CreateToken(InstrumentationTracker.PerformanceCounter.GrantedLockCache, "Read");
transaction.GrantedLockCache.Write((obj) => obj.Add(intention.Key));
transaction.GrantedLockCache.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.Add(intention.Key));
ptGrantedLockCacheWrite?.StopAndAccumulate();
return lockKey;
}
Expand Down Expand Up @@ -211,7 +211,7 @@ internal Dictionary<TransactionSnapshot, ObjectLockIntention> SnapshotWaitingTra

//Let other transactions know that we are no longer waiting on this lock.
var ptPendingGrantLock = transaction.Instrumentation?.CreateToken(InstrumentationTracker.PerformanceCounter.PendingGrantLock, "Read");
_pendingGrants.Write((pendingGrants) =>
_pendingGrants.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (pendingGrants) =>
{
ptPendingGrantLock?.StopAndAccumulate();
pendingGrants.Remove(pendingGrantKey);
Expand Down Expand Up @@ -252,7 +252,7 @@ internal Dictionary<TransactionSnapshot, ObjectLockIntention> SnapshotWaitingTra
lockedObjects.Add(lockedObject);

lockKey = lockedObject.IssueSingleUseKey(transaction, intention);
transaction.HeldLockKeys.Write((obj) => obj.Add(lockKey));
transaction.HeldLockKeys.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.Add(lockKey));

var lockWaitTime = (DateTime.UtcNow - intention.CreationTime).TotalMilliseconds;
_core.Health.IncrementContinuous(HealthCounterType.LockWaitMs, lockWaitTime);
Expand All @@ -271,7 +271,7 @@ internal Dictionary<TransactionSnapshot, ObjectLockIntention> SnapshotWaitingTra

if (blockers.Count == 0)
{
transaction.BlockedByKeys.Write((obj) => obj.Clear());
transaction.BlockedByKeys.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.Clear());

foreach (var lockedObject in lockedObjects)
{
Expand All @@ -286,7 +286,7 @@ internal Dictionary<TransactionSnapshot, ObjectLockIntention> SnapshotWaitingTra
}

lockKey = lockedObject.IssueSingleUseKey(transaction, intention);
transaction.HeldLockKeys.Write((obj) => obj.Add(lockKey));
transaction.HeldLockKeys.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.Add(lockKey));
}

var lockWaitTime = (DateTime.UtcNow - intention.CreationTime).TotalMilliseconds;
Expand All @@ -297,7 +297,7 @@ internal Dictionary<TransactionSnapshot, ObjectLockIntention> SnapshotWaitingTra
}
else
{
transaction.BlockedByKeys.Write((obj) =>
transaction.BlockedByKeys.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) =>
{
obj.AddRange(blockers.Distinct());
obj.Clear();
Expand All @@ -318,7 +318,7 @@ internal Dictionary<TransactionSnapshot, ObjectLockIntention> SnapshotWaitingTra

if (blockers.Count == 0)
{
transaction.BlockedByKeys.Write((obj) => obj.Clear());
transaction.BlockedByKeys.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.Clear());

foreach (var lockedObject in lockedObjects)
{
Expand All @@ -333,7 +333,7 @@ internal Dictionary<TransactionSnapshot, ObjectLockIntention> SnapshotWaitingTra
}

lockKey = lockedObject.IssueSingleUseKey(transaction, intention);
transaction.HeldLockKeys.Write((obj) => obj.Add(lockKey));
transaction.HeldLockKeys.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.Add(lockKey));
}

var lockWaitTime = (DateTime.UtcNow - intention.CreationTime).TotalMilliseconds;
Expand All @@ -344,7 +344,7 @@ internal Dictionary<TransactionSnapshot, ObjectLockIntention> SnapshotWaitingTra
}
else
{
transaction.BlockedByKeys.Write((obj) =>
transaction.BlockedByKeys.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) =>
{
obj.Clear();
obj.AddRange(blockers.Distinct());
Expand All @@ -364,7 +364,7 @@ internal Dictionary<TransactionSnapshot, ObjectLockIntention> SnapshotWaitingTra

if (blockers.Count == 0)
{
transaction.BlockedByKeys.Write((obj) => obj.Clear());
transaction.BlockedByKeys.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.Clear());

foreach (var lockedObject in lockedObjects)
{
Expand All @@ -379,7 +379,7 @@ internal Dictionary<TransactionSnapshot, ObjectLockIntention> SnapshotWaitingTra
}

lockKey = lockedObject.IssueSingleUseKey(transaction, intention);
transaction.HeldLockKeys.Write((obj) => obj.Add(lockKey));
transaction.HeldLockKeys.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.Add(lockKey));
}

var lockWaitTime = (DateTime.UtcNow - intention.CreationTime).TotalMilliseconds;
Expand All @@ -390,7 +390,7 @@ internal Dictionary<TransactionSnapshot, ObjectLockIntention> SnapshotWaitingTra
}
else
{
transaction.BlockedByKeys.Write((obj) =>
transaction.BlockedByKeys.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) =>
{
obj.Clear();
obj.AddRange(blockers.Distinct());
Expand All @@ -410,7 +410,7 @@ internal Dictionary<TransactionSnapshot, ObjectLockIntention> SnapshotWaitingTra

if (blockers.Count == 0) //If there are no existing un-owned locks.
{
transaction.BlockedByKeys.Write((obj) => obj.Clear());
transaction.BlockedByKeys.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.Clear());

foreach (var lockedObject in lockedObjects)
{
Expand All @@ -426,7 +426,7 @@ internal Dictionary<TransactionSnapshot, ObjectLockIntention> SnapshotWaitingTra

lockKey = lockedObject.IssueSingleUseKey(transaction, intention);

transaction.HeldLockKeys.Write((obj) => obj.Add(lockKey));
transaction.HeldLockKeys.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) => obj.Add(lockKey));
}

var lockWaitTime = (DateTime.UtcNow - intention.CreationTime).TotalMilliseconds;
Expand All @@ -437,7 +437,7 @@ internal Dictionary<TransactionSnapshot, ObjectLockIntention> SnapshotWaitingTra
}
else
{
transaction.BlockedByKeys.Write((obj) =>
transaction.BlockedByKeys.DeadlockAvoidanceTryWrite(10, () => _core.IsRunning, (obj) =>
{
obj.Clear();
obj.AddRange(blockers.Distinct());
Expand Down
4 changes: 2 additions & 2 deletions NTDLS.Katzebase.Engine/NTDLS.Katzebase.Engine.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="CoreCLR-NCalc" Version="3.1.253" />
<PackageReference Include="NTDLS.DelegateThreadPooling" Version="1.5.10" />
<PackageReference Include="NTDLS.FastMemoryCache" Version="1.7.12" />
<PackageReference Include="NTDLS.DelegateThreadPooling" Version="1.5.11" />
<PackageReference Include="NTDLS.FastMemoryCache" Version="1.7.13" />
<PackageReference Include="Serilog" Version="4.2.0" />
</ItemGroup>

Expand Down

0 comments on commit e0b5ee4

Please sign in to comment.