From ac1f9c66e790b25294e9ee0ef98928e33eff6367 Mon Sep 17 00:00:00 2001 From: Nodari Chkuaselidze Date: Wed, 27 Dec 2023 15:39:00 +0400 Subject: [PATCH] node: add fullLock option to the interactive rescan. Interactive rescan by default does per block scan lock. This enables parallel rescans, as well as chain sync while rescan is in progress. But in specific cases, it may be more beneficial to stop the node from syncing while the rescan is in progress. --- CHANGELOG.md | 2 +- lib/blockchain/chain.js | 40 +++++++++++++++-- lib/client/node.js | 5 ++- lib/node/fullnode.js | 6 ++- lib/node/http.js | 8 ++-- test/node-rescan-test.js | 96 ++++++++++++++++++++++++++++++++++++---- 6 files changed, 138 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7dd7d77a1..4478c72e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,7 +31,7 @@ process and allows parallel rescans. - `compactInterval` - what is the current compaction interval config. - `nextCompaction` - when will the next compaction trigger after restart. - `lastCompaction` - when was the last compaction run. - - Introduce `scan interactive` hook (start, filter) + - Introduce `scan interactive` hook (start, filter, fullLock) ### Node HTTP Client: - Introduce `scanInteractive` method that starts interactive rescan. diff --git a/lib/blockchain/chain.js b/lib/blockchain/chain.js index aa5bb4c5a..f537143e6 100644 --- a/lib/blockchain/chain.js +++ b/lib/blockchain/chain.js @@ -2273,9 +2273,39 @@ class Chain extends AsyncEmitter { * @param {BloomFilter} filter - Starting bloom filter containing tx, * address and name hashes. * @param {Function} iter - Iterator. + * @param {Boolean} [fullLock=false] + * @returns {Promise} + */ + + async scanInteractive(start, filter, iter, fullLock = false) { + if (fullLock) { + const unlock = await this.locker.lock(); + try { + // We lock the whole chain, no longer lock per block scan. + return await this._scanInteractive(start, filter, iter, false); + } catch (e) { + this.logger.debug('Scan(interactive) errored. Error: %s', e.message); + throw e; + } finally { + unlock(); + } + } + + return this._scanInteractive(start, filter, iter, true); + } + + /** + * Interactive scan the blockchain for transactions containing specified + * address hashes. Allows repeat and abort. + * @param {Hash|Number} start - Block hash or height to start at. + * @param {BloomFilter} filter - Starting bloom filter containing tx, + * address and name hashes. + * @param {Function} iter - Iterator. + * @param {Boolean} [lockPerScan=true] - if we should lock per block scan. + * @returns {Promise} */ - async scanInteractive(start, filter, iter) { + async _scanInteractive(start, filter, iter, lockPerScan = true) { if (start == null) start = this.network.genesis.hash; @@ -2287,7 +2317,10 @@ class Chain extends AsyncEmitter { let hash = start; while (hash != null) { - const unlock = await this.locker.lock(); + let unlock; + + if (lockPerScan) + unlock = await this.locker.lock(); try { const {entry, txs} = await this.db.scanBlock(hash, filter); @@ -2333,7 +2366,8 @@ class Chain extends AsyncEmitter { this.logger.debug('Scan(interactive) errored. Error: %s', e.message); throw e; } finally { - unlock(); + if (lockPerScan) + unlock(); } } } diff --git a/lib/client/node.js b/lib/client/node.js index 4bf1ae02d..6cb20c8d1 100644 --- a/lib/client/node.js +++ b/lib/client/node.js @@ -339,16 +339,17 @@ class NodeClient extends Client { * Rescan for any missed transactions. (Interactive) * @param {Number|Hash} start - Start block. * @param {BloomFilter} [filter] + * @param {Boolean} [fullLock=false] * @returns {Promise} */ - rescanInteractive(start, filter = null) { + rescanInteractive(start, filter = null, fullLock = false) { if (start == null) start = 0; assert(typeof start === 'number' || Buffer.isBuffer(start)); - return this.call('rescan interactive', start, filter); + return this.call('rescan interactive', start, filter, fullLock); } } diff --git a/lib/node/fullnode.js b/lib/node/fullnode.js index de7fcc2b0..0e8bbdcb8 100644 --- a/lib/node/fullnode.js +++ b/lib/node/fullnode.js @@ -369,11 +369,13 @@ class FullNode extends Node { * @param {Number|Hash} start - Start block. * @param {BloomFilter} filter * @param {Function} iter - Iterator. + * @param {Boolean} [fullLock=false] - lock the whole chain instead of per + * scan. * @returns {Promise} */ - scanInteractive(start, filter, iter) { - return this.chain.scanInteractive(start, filter, iter); + scanInteractive(start, filter, iter, fullLock = false) { + return this.chain.scanInteractive(start, filter, iter, fullLock); } /** diff --git a/lib/node/http.js b/lib/node/http.js index e82b482bd..93277edb2 100644 --- a/lib/node/http.js +++ b/lib/node/http.js @@ -712,6 +712,7 @@ class HTTP extends Server { const valid = new Validator(args); const start = valid.uintbhash(0); const rawFilter = valid.buf(1); + const fullLock = valid.bool(2, false); let filter = socket.filter; if (start == null) @@ -720,7 +721,7 @@ class HTTP extends Server { if (rawFilter) filter = BloomFilter.fromRaw(rawFilter); - return this.scanInteractive(socket, start, filter); + return this.scanInteractive(socket, start, filter, fullLock); }); } @@ -859,10 +860,11 @@ class HTTP extends Server { * @param {WebSocket} socket * @param {Hash} start * @param {BloomFilter} filter + * @param {Boolean} [fullLock=false] * @returns {Promise} */ - async scanInteractive(socket, start, filter) { + async scanInteractive(socket, start, filter, fullLock = false) { const iter = async (entry, txs) => { const block = entry.encode(); const raw = []; @@ -921,7 +923,7 @@ class HTTP extends Server { }; try { - await this.node.scanInteractive(start, filter, iter); + await this.node.scanInteractive(start, filter, iter, fullLock); } catch (err) { return socket.call('block rescan interactive abort', err.message); } diff --git a/test/node-rescan-test.js b/test/node-rescan-test.js index 3d6fca10c..86584b7b2 100644 --- a/test/node-rescan-test.js +++ b/test/node-rescan-test.js @@ -399,20 +399,57 @@ describe('Node Rescan Interactive API', function() { node.scanInteractive(startHeight, null, getIter(counter2)) ]); - assert.strictEqual(counter1.count, 10); - assert.strictEqual(counter2.count, 10); + assert.strictEqual(counter1.count, RESCAN_DEPTH); + assert.strictEqual(counter2.count, RESCAN_DEPTH); - // Chain gets locked per block, so we should see alternating events. + // Chain gets locked per block by default, so we should see alternating events. // Because they start in parallel, but id1 starts first they will be // getting events in alternating older (first one gets lock, second waits, // second gets lock, first waits, etc.) - for (let i = 0; i < 10; i++) { + for (let i = 0; i < RESCAN_DEPTH; i++) { assert.strictEqual(events[i].id, 1); assert.strictEqual(events[i + 1].id, 2); i++; } }); + it('should rescan in series', async () => { + const {node} = nodeCtx; + const startHeight = nodeCtx.height - RESCAN_DEPTH + 1; + + const events = []; + const getIter = (counterObj) => { + return async (entry, txs) => { + assert.strictEqual(entry.height, startHeight + counterObj.count); + assert.strictEqual(txs.length, 4); + + events.push({ ...counterObj }); + counterObj.count++; + + return { + type: scanActions.NEXT + }; + }; + }; + + const counter1 = { id: 1, count: 0 }; + const counter2 = { id: 2, count: 0 }; + await Promise.all([ + node.scanInteractive(startHeight, null, getIter(counter1), true), + node.scanInteractive(startHeight, null, getIter(counter2), true) + ]); + + assert.strictEqual(counter1.count, RESCAN_DEPTH); + assert.strictEqual(counter2.count, RESCAN_DEPTH); + + // We lock the whole chain for this test, so we should see events + // from one to other. + for (let i = 0; i < RESCAN_DEPTH; i++) { + assert.strictEqual(events[i].id, 1); + assert.strictEqual(events[i + RESCAN_DEPTH].id, 2); + } + }); + describe('HTTP', function() { let client = null; @@ -456,7 +493,7 @@ describe('Node Rescan Interactive API', function() { filter = test.filter.encode(); await client.rescanInteractive(startHeight, filter); - assert.strictEqual(count, 10); + assert.strictEqual(count, RESCAN_DEPTH); count = 0; if (test.filter) @@ -757,20 +794,63 @@ describe('Node Rescan Interactive API', function() { client2.rescanInteractive(startHeight) ]); - assert.strictEqual(counter1.count, 10); - assert.strictEqual(counter2.count, 10); + assert.strictEqual(counter1.count, RESCAN_DEPTH); + assert.strictEqual(counter2.count, RESCAN_DEPTH); // Chain gets locked per block, so we should see alternating events. // Because they start in parallel, but id1 starts first they will be // getting events in alternating older (first one gets lock, second waits, // second gets lock, first waits, etc.) - for (let i = 0; i < 10; i++) { + for (let i = 0; i < RESCAN_DEPTH; i++) { assert.strictEqual(events[i].id, 1); assert.strictEqual(events[i + 1].id, 2); i++; } }); + it('should rescan in series', async () => { + const client2 = nodeCtx.nodeClient(); + await client2.open(); + + const startHeight = nodeCtx.height - RESCAN_DEPTH + 1; + const events = []; + const counter1 = { id: 1, count: 0 }; + const counter2 = { id: 2, count: 0 }; + + const getIter = (counterObj) => { + return async (rawEntry, rawTXs) => { + const [entry, txs] = parseBlock(rawEntry, rawTXs); + assert.strictEqual(entry.height, startHeight + counterObj.count); + assert.strictEqual(txs.length, 4); + + events.push({ ...counterObj }); + counterObj.count++; + + return { + type: scanActions.NEXT + }; + }; + }; + + client.hook('block rescan interactive', getIter(counter1)); + client2.hook('block rescan interactive', getIter(counter2)); + + await Promise.all([ + client.rescanInteractive(startHeight, null, true), + client2.rescanInteractive(startHeight, null, true) + ]); + + assert.strictEqual(counter1.count, RESCAN_DEPTH); + assert.strictEqual(counter2.count, RESCAN_DEPTH); + + // We lock the whole chain for this test, so we should see events + // from one to other. + for (let i = 0; i < RESCAN_DEPTH; i++) { + assert.strictEqual(events[i].id, 1); + assert.strictEqual(events[i + RESCAN_DEPTH].id, 2); + } + }); + // Make sure the client closing does not cause the chain locker to get // indefinitely locked. (https://github.com/bcoin-org/bsock/pull/11) it('should stop rescan when client closes', async () => {