diff --git a/src/Paprika.Benchmarks/PooledSpanDictionaryBenchmarks.cs b/src/Paprika.Benchmarks/PooledSpanDictionaryBenchmarks.cs index 659139cc..b7eafbf8 100644 --- a/src/Paprika.Benchmarks/PooledSpanDictionaryBenchmarks.cs +++ b/src/Paprika.Benchmarks/PooledSpanDictionaryBenchmarks.cs @@ -22,6 +22,7 @@ public class PooledSpanDictionaryBenchmarks }; private readonly PooledSpanDictionary _varLengthKeys; + private readonly PooledSpanDictionary _readWrite; private const int VarLengthKeyCollisions = 8; private const ulong VarLengthKeyCollisionHash = 2348598349058394; @@ -51,6 +52,8 @@ public PooledSpanDictionaryBenchmarks() { _bigDict.Set(VarLengthKey[..VarLengthKeyCollisions], VarLengthKeyCollisionHash, Value32Bytes, 1); } + + _readWrite = new PooledSpanDictionary(new BufferPool(128, BufferPool.PageTracking.None, null)); } [Benchmark] @@ -124,16 +127,16 @@ public int Read_missing_with_no_hash_collisions() [Benchmark] public int Read_write_small() { - using var dict = new PooledSpanDictionary(_pool, false); - Span key = stackalloc byte[2]; var count = 0; for (byte i = 0; i < 255; i++) { key[0] = i; - dict.Set(key, i, key, 1); - dict.TryGet(key, i, out var result); + + _readWrite.Set(key, i, key, 1); + _readWrite.TryGet(key, i, out var result); + count += result[0]; } diff --git a/src/Paprika.Tests/Merkle/Commit.cs b/src/Paprika.Tests/Merkle/Commit.cs index 6707875d..285f5940 100644 --- a/src/Paprika.Tests/Merkle/Commit.cs +++ b/src/Paprika.Tests/Merkle/Commit.cs @@ -183,23 +183,17 @@ private static byte[] Concat(in ReadOnlySpan payload0, in ReadOnlySpan _data = new(Comparer); - public ChildCommit(ICommit commit) - { - _commit = commit; - } - public void Dispose() => _data.Clear(); public ReadOnlySpanOwnerWithMetadata Get(scoped in Key key) { return _data.TryGetValue(GetKey(key), out var value) ? new ReadOnlySpanOwner(value, null).WithDepth(0) - : _commit.Get(key); + : commit.Get(key); } public void Set(in Key key, in ReadOnlySpan payload, EntryType type) @@ -217,7 +211,7 @@ public void Commit() foreach (var kvp in _data) { Key.ReadFrom(kvp.Key, out var key); - _commit.Set(key, kvp.Value); + commit.Set(key, kvp.Value); } } diff --git a/src/Paprika.Tests/Merkle/RootHashFuzzyTests.cs b/src/Paprika.Tests/Merkle/RootHashFuzzyTests.cs index 9c59e9c9..4673f51f 100644 --- a/src/Paprika.Tests/Merkle/RootHashFuzzyTests.cs +++ b/src/Paprika.Tests/Merkle/RootHashFuzzyTests.cs @@ -50,7 +50,7 @@ public void Over_one_mock_commit(string test) [TestCase(nameof(Accounts_100_Storage_1), int.MaxValue, 4)] [TestCase(nameof(Accounts_1_Storage_100), 11, 8)] - [TestCase(nameof(Accounts_1000_Storage_1000), int.MaxValue, 1016, Category = Categories.LongRunning)] + [TestCase(nameof(Accounts_1000_Storage_1000), int.MaxValue, 2500, Category = Categories.LongRunning)] public async Task In_memory_run(string test, int commitEvery, int blockchainPoolSizeMB) { var generator = Build(test); diff --git a/src/Paprika/Chain/Blockchain.cs b/src/Paprika/Chain/Blockchain.cs index 0a2ed2c1..9c4d5b16 100644 --- a/src/Paprika/Chain/Blockchain.cs +++ b/src/Paprika/Chain/Blockchain.cs @@ -1176,73 +1176,35 @@ void ICommit.Visit(CommitAction action, TrieType type) } } - IChildCommit ICommit.GetChild() => new ChildCommit(Pool, this); + IChildCommit ICommit.GetChild() => new ChildCommit(this, this); public IReadOnlySet TouchedAccounts => _touchedAccounts; public IReadOnlyDictionary TouchedStorageSlots => _storageSlots; - class ChildCommit(BufferPool pool, ICommit parent) : RefCountingDisposable, IChildCommit + sealed class ChildCommit(ICommit parent, BlockState owner) : IChildCommit { - private readonly PooledSpanDictionary _dict = new(pool, true); + public ReadOnlySpanOwnerWithMetadata Get(scoped in Key key) => parent.Get(in key); - [SkipLocalsInit] - public ReadOnlySpanOwnerWithMetadata Get(scoped in Key key) - { - var hash = GetHash(key); - var keyWritten = key.WriteTo(stackalloc byte[key.MaxByteLength]); + public void Set(in Key key, in ReadOnlySpan payload, EntryType type = EntryType.Persistent) => + parent.Set(in key, in payload, type); - if (_dict.TryGet(keyWritten, hash, out var result)) - { - AcquireLease(); - return new ReadOnlySpanOwnerWithMetadata(new ReadOnlySpanOwner(result, this), 0); - } + public void Set(in Key key, in ReadOnlySpan payload0, in ReadOnlySpan payload1, + EntryType type = EntryType.Persistent) => parent.Set(in key, in payload0, in payload1, type); - // Don't nest, as reaching to parent should be easy. - return parent.Get(key); - } + public IChildCommit GetChild() => parent.GetChild(); - [SkipLocalsInit] - public void Set(in Key key, in ReadOnlySpan payload, EntryType type) - { - var hash = GetHash(key); - var keyWritten = key.WriteTo(stackalloc byte[key.MaxByteLength]); + public bool Owns(object? actualSpanOwner) => ReferenceEquals(actualSpanOwner, owner); - _dict.Set(keyWritten, hash, payload, (byte)type); - } - - [SkipLocalsInit] - public void Set(in Key key, in ReadOnlySpan payload0, in ReadOnlySpan payload1, EntryType type) + public void Dispose() { - var hash = GetHash(key); - var keyWritten = key.WriteTo(stackalloc byte[key.MaxByteLength]); - - _dict.Set(keyWritten, hash, payload0, payload1, (byte)type); + // NOOP, nothing to dispose } public void Commit() { - foreach (var kvp in _dict) - { - Key.ReadFrom(kvp.Key, out var key); - var type = (EntryType)kvp.Metadata; - - // flush down only volatiles - if (type != EntryType.UseOnce) - { - parent.Set(key, kvp.Value, type); - } - } - } - - public IChildCommit GetChild() => new ChildCommit(pool, this); - - protected override void CleanUp() - { - _dict.Dispose(); + // NOOP, nothing to commit } - - public override string ToString() => _dict.ToString(); } [SkipLocalsInit] diff --git a/src/Paprika/Chain/PooledSpanDictionary.cs b/src/Paprika/Chain/PooledSpanDictionary.cs index baa91e81..0cf1c7d4 100644 --- a/src/Paprika/Chain/PooledSpanDictionary.cs +++ b/src/Paprika/Chain/PooledSpanDictionary.cs @@ -5,6 +5,7 @@ using Paprika.Crypto; using Paprika.Data; using Paprika.Store; +using Paprika.Utils; namespace Paprika.Chain; @@ -13,6 +14,14 @@ namespace Paprika.Chain; /// public class PooledSpanDictionary : IDisposable { + /// + /// Gets the size of the address to the next item. + /// + /// + /// Pointer size. Assumes 64 bits. + /// + private const int PointerSize = 8; + private const int BufferSize = BufferPool.BufferSize; private readonly BufferPool _pool; @@ -32,8 +41,6 @@ public class PooledSpanDictionary : IDisposable /// /// Set to true, if the data written once should not be overwritten. /// This allows to hold values returned by the dictionary through multiple operations. - /// - /// This dictionary uses to store keys buffers to allow concurrent readers /// public PooledSpanDictionary(BufferPool pool, bool preserveOldValues = false) { @@ -43,9 +50,15 @@ public PooledSpanDictionary(BufferPool pool, bool preserveOldValues = false) var pages = new Page[Root.PageCount]; for (var i = 0; i < Root.PageCount; i++) { - pages[i] = RentNewPage(true); + pages[i] = RentNewPage(false); } + ParallelUnbalancedWork.For(0, Root.PageCount, pages, (i, p) => + { + p[i].Clear(); + return p; + }); + _root = new Root(pages); AllocateNewPage(); @@ -83,24 +96,24 @@ public bool TryGet(scoped ReadOnlySpan key, ulong hash, out ReadOnlySpan key, uint leftover, uint bucket) + private SearchResult TryGetImpl(scoped ReadOnlySpan key, uint leftover, uint bucket) { Debug.Assert(BitOperations.LeadingZeroCount(leftover) >= 11, "First 10 bits should be left unused"); - var address = _root[(int)bucket]; - if (address == 0) goto NotFound; + ref var location = ref _root[(int)bucket]; + + var address = Volatile.Read(ref location); + + if (address == UIntPtr.Zero) goto NotFound; - ref var pages = ref MemoryMarshal.GetReference(CollectionsMarshal.AsSpan(_pages)); do { - var (pageNo, atPage) = Math.DivRem(address, Page.PageSize); - - ref var at = ref Unsafe.AsRef((byte*)Unsafe.Add(ref pages, pageNo).Raw.ToPointer() + atPage); + ref var at = ref ReadAtAddress(address); var header = at & PreambleBits; if ((header & DestroyedBit) == 0) @@ -126,12 +139,14 @@ private unsafe SearchResult TryGetImpl(scoped ReadOnlySpan key, uint lefto } // Decode next entry address - address = Unsafe.ReadUnaligned(ref Unsafe.Add(ref at, PreambleLength)); + address = Unsafe.ReadUnaligned(ref Unsafe.Add(ref at, PreambleLength)); } while (address != 0); NotFound: return default; } + private static unsafe ref byte ReadAtAddress(UIntPtr address) => ref Unsafe.AsRef(address.ToPointer()); + private static (uint leftover, uint bucket) GetBucketAndLeftover(ulong hash) => Math.DivRem(Mix(hash), Root.BucketCount); @@ -270,8 +285,6 @@ private void SetImpl(scoped ReadOnlySpan key, uint mixed, ReadOnlySpan= 10, "First 10 bits should be left unused"); - var root = _root[(int)bucket]; - var dataLength = data1.Length + data0.Length; var size = PreambleLength + AddressLength + KeyLengthLength + key.Length + ValueLengthLength + dataLength; @@ -283,9 +296,6 @@ private void SetImpl(scoped ReadOnlySpan key, uint mixed, ReadOnlySpan> 8); destination[2] = (byte)(leftover & 0xFF); - // Write next - Unsafe.WriteUnaligned(ref destination[PreambleLength], root); - // Key length const int keyStart = PreambleLength + AddressLength; destination[keyStart] = (byte)key.Length; @@ -301,7 +311,25 @@ private void SetImpl(scoped ReadOnlySpan key, uint mixed, ReadOnlySpan key, ulong hash) @@ -319,10 +347,10 @@ public void Destroy(scoped ReadOnlySpan key, ulong hash) /// Enumerator walks through all the values beside the ones that were destroyed in this dictionary /// with . /// - public ref struct Enumerator(PooledSpanDictionary dictionary) + public unsafe ref struct Enumerator(PooledSpanDictionary dictionary) { private int _bucket = -1; - private uint _address = 0; + private UIntPtr _address = 0; private ref byte _at; public bool MoveNext() @@ -330,7 +358,7 @@ public bool MoveNext() while (_bucket < Root.BucketCount) { // On empty, scan to the next bucket that is not empty - while (_address == 0) + while (_address == UIntPtr.Zero) { _bucket++; if (_bucket == Root.BucketCount) @@ -338,17 +366,17 @@ public bool MoveNext() return false; } - _address = dictionary._root[_bucket]; + _address = Volatile.Read(ref dictionary._root[_bucket]); } // Scan the bucket till it's not destroyed - while (_address != 0) + while (_address != UIntPtr.Zero) { // Capture the current, move address to next immediately - ref var at = ref dictionary.GetAt(_address); + ref var at = ref ReadAtAddress(_address); // The position is captured in ref at above, move to next - _address = Unsafe.ReadUnaligned(ref Unsafe.Add(ref at, PreambleLength)); + _address = Unsafe.ReadUnaligned(ref Unsafe.Add(ref at, PreambleLength)); if ((at & DestroyedBit) == 0) { @@ -418,14 +446,6 @@ public void Destroy() } } - private ref byte GetAt(uint address) - { - Debug.Assert(address > 0); - - var (pageNo, atPage) = Math.DivRem(address, Page.PageSize); - return ref Unsafe.Add(ref MemoryMarshal.GetReference(_pages[(int)pageNo].Span), (int)atPage); - } - private void AllocateNewPage() { var page = RentNewPage(false); @@ -440,25 +460,39 @@ private Page RentNewPage(bool clear) return page; } - private static uint Mix(ulong hash) => unchecked((uint)((hash >> 32) ^ hash)); + private readonly object _lock = new(); + + private static int Align(int value, int alignment) => (value + (alignment - 1)) & -alignment; - private Span Write(int size, out uint addr) + private Span Write(int size, out UIntPtr addr) { - if (BufferSize - _position < size) + // Memoize to make contention as small as possible + Page current; + int position; + + // Align so that we don't step on each other too much. + size = Align(size, PointerSize); + + lock (_lock) { - // not enough memory - AllocateNewPage(); - } + if (BufferSize - _position < size) + { + // not enough memory + AllocateNewPage(); + } - // allocated before the position is changed - var span = _current.Span.Slice(_position, size); + position = _position; + current = _current; - addr = (uint)(_position + (_pages.Count - 1) * BufferSize); - _position += size; + // Amend _position so that it can be used by other threads. + _position += size; + } - return span; + // allocated before the position is changed + addr = current.Raw + (UIntPtr)position; + return current.Span.Slice(position, size); } public void Dispose() @@ -513,27 +547,42 @@ public void Describe(TextWriter text, Key.Predicate? predicate = null) static string S(in NibblePath full) => full.UnsafeAsKeccak.ToString(); } - private readonly struct Root(Page[] pages) + [StructLayout(LayoutKind.Explicit, Size = SizeOf)] + private readonly struct Root { + [FieldOffset(0)] + private readonly Page _pages; + + public Root(Page[] pages) + { + Debug.Assert(pages.Length == PageCount); + pages.CopyTo(MemoryMarshal.CreateSpan(ref _pages, PageCount)); + } + + /// + /// The size of this structure. + /// + private const int SizeOf = PageCount * PointerSize; + /// - /// 16gives 4kb * 16, 64kb allocated per dictionary. - /// This gives 16k buckets which should be sufficient to have a really low ratio of collisions for majority of the blocks. + /// The total number of pages used by the root construct. The bigger, the bigger fanout it is. /// - public const int PageCount = 16; + public const int PageCount = 32; public static readonly int BucketCountLog2 = BitOperations.Log2(BucketCount); public const int BucketCount = PageCount * BucketsPerPage; - private const int BucketsPerPage = Page.PageSize / sizeof(uint); + private const int BucketsPerPage = Page.PageSize / PointerSize; private const int InPageMask = BucketsPerPage - 1; private static readonly int PageShift = BitOperations.Log2(BucketsPerPage); - public unsafe ref uint this[int bucket] + public unsafe ref UIntPtr this[int bucket] { get { - var raw = Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(pages), bucket >> PageShift).Raw; - return ref Unsafe.Add(ref Unsafe.AsRef(raw.ToPointer()), bucket & InPageMask); + var shift = bucket >> PageShift; + var raw = Unsafe.Add(ref Unsafe.AsRef(in _pages), shift).Raw; + return ref Unsafe.Add(ref Unsafe.AsRef(raw.ToPointer()), bucket & InPageMask); } } }