From c08bc29f3334eef3461276e71d82286d683f9c1f Mon Sep 17 00:00:00 2001 From: Anders Rune Jensen Date: Fri, 19 Feb 2021 00:14:10 +0100 Subject: [PATCH 1/7] Faster sort using a Priority Queue --- index.js | 48 ++++++++++++++++++++++++++++++------------------ package.json | 1 + 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/index.js b/index.js index 466099c..25a2a13 100644 --- a/index.js +++ b/index.js @@ -7,6 +7,7 @@ const pullAsync = require('pull-async') const TypedFastBitSet = require('typedfastbitset') const bsb = require('binary-search-bounds') const multicb = require('multicb') +const FastPriorityQueue = require('fastpriorityqueue') const debug = require('debug')('jitdb') const debugQuery = debug.extend('query') const Status = require('./status') @@ -1000,22 +1001,30 @@ module.exports = function (log, indexesPath) { }) } + function compareAscending(a, b) { + return b.timestamp > a.timestamp + } + + function compareDescending(a, b) { + return a.timestamp > b.timestamp + } + function sortedByTimestamp(bitset, descending) { updateCacheWithLog() const order = descending ? 'descending' : 'ascending' - if (sortedCache[order].has(bitset)) return sortedCache[order].get(bitset) - const timestamped = bitset.array().map((seq) => { - return { + if (sortedCache[order].has(bitset)) + return sortedCache[order].get(bitset).clone() + const fpq = new FastPriorityQueue( + descending ? compareDescending : compareAscending + ) + bitset.array().forEach((seq) => { + fpq.add({ seq, timestamp: indexes['timestamp'].tarr[seq], - } - }) - const sorted = timestamped.sort((a, b) => { - if (descending) return b.timestamp - a.timestamp - else return a.timestamp - b.timestamp + }) }) - sortedCache[order].set(bitset, sorted) - return sorted + sortedCache[order].set(bitset, fpq.clone()) + return fpq } function getMessagesFromBitsetSlice( @@ -1029,12 +1038,15 @@ module.exports = function (log, indexesPath) { seq = seq || 0 const sorted = sortedByTimestamp(bitset, descending) - const sliced = - limit != null - ? sorted.slice(seq, seq + limit) - : seq > 0 - ? sorted.slice(seq) - : sorted + const resultSize = sorted.size + + const sliced = [] + for (var i = 0; i < seq; ++i) sorted.poll() + + while (!sorted.isEmpty()) { + sliced.push(sorted.poll()) + if (limit !== null && sliced.length == limit) break + } push( push.values(sliced), @@ -1046,7 +1058,7 @@ module.exports = function (log, indexesPath) { push.collect((err, results) => { cb(err, { results: results, - total: sorted.length, + total: resultSize, }) }) ) @@ -1054,7 +1066,7 @@ module.exports = function (log, indexesPath) { function countBitsetSlice(bitset, seq, descending) { if (!seq) return bitset.size() - else return sortedByTimestamp(bitset, descending).slice(seq).length + else return } function paginate(operation, seq, limit, descending, onlyOffset, cb) { diff --git a/package.json b/package.json index 5ebffb0..a8c6576 100644 --- a/package.json +++ b/package.json @@ -12,6 +12,7 @@ "binary-search-bounds": "^2.0.4", "bipf": "^1.4.0", "debug": "^4.2.0", + "fastpriorityqueue": "^0.6.3", "idb-kv-store": "^4.5.0", "jsesc": "^3.0.2", "mkdirp": "^1.0.4", From 5d8dfaa82d23d2930b73b1dc226c54fa630f99a8 Mon Sep 17 00:00:00 2001 From: Anders Rune Jensen Date: Fri, 19 Feb 2021 00:43:31 +0100 Subject: [PATCH 2/7] Fix count + slice --- index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index.js b/index.js index 25a2a13..97b4cf4 100644 --- a/index.js +++ b/index.js @@ -1066,7 +1066,7 @@ module.exports = function (log, indexesPath) { function countBitsetSlice(bitset, seq, descending) { if (!seq) return bitset.size() - else return + else return bitset.size() - seq } function paginate(operation, seq, limit, descending, onlyOffset, cb) { From bdd3b3a840107d5ad83100cbb983184817ddd6a7 Mon Sep 17 00:00:00 2001 From: Anders Rune Jensen Date: Fri, 19 Feb 2021 08:11:57 +0100 Subject: [PATCH 3/7] Add test for count + seq --- test/operators.js | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/test/operators.js b/test/operators.js index 7f3412a..199e380 100644 --- a/test/operators.js +++ b/test/operators.js @@ -614,6 +614,29 @@ prepareAndRunTest('count operator toCallback', dir, (t, db, raf) => { }) }) +prepareAndRunTest('count with seq operator toCallback', dir, (t, db, raf) => { + const msg = { type: 'food', text: 'Lunch' } + let state = validate.initial() + state = validate.appendNew(state, null, alice, msg, Date.now()) + state = validate.appendNew(state, null, bob, msg, Date.now() + 1) + + addMsg(state.queue[0].value, raf, (e1, msg1) => { + addMsg(state.queue[1].value, raf, (e2, msg2) => { + query( + fromDB(db), + and(slowEqual('value.content.type', 'food')), + startFrom(1), + count(), + toCallback((err, total) => { + t.error(err, 'no error') + t.equal(total, 1) + t.end() + }) + ) + }) + }) +}) + prepareAndRunTest('count operator toPullStream', dir, (t, db, raf) => { const msg = { type: 'drink', text: 'Juice' } let state = validate.initial() @@ -631,7 +654,6 @@ prepareAndRunTest('count operator toPullStream', dir, (t, db, raf) => { ), pull.collect((err, results) => { t.error(err, 'no error') - console.log(results) t.equal(results.length, 1) t.equal(results[0], 2) t.end() From 50e330d7cf18726a60d4408264e04ac86dd141ad Mon Sep 17 00:00:00 2001 From: Anders Rune Jensen Date: Fri, 19 Feb 2021 08:24:13 +0100 Subject: [PATCH 4/7] Make tests less flaky --- test/query.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/test/query.js b/test/query.js index 35c868c..34ea0b3 100644 --- a/test/query.js +++ b/test/query.js @@ -709,10 +709,12 @@ prepareAndRunTest('Timestamp discontinuity', dir, (t, db, raf) => { const msg2 = { type: 'post', text: '2nd' } const msg3 = { type: 'post', text: '3rd' } + const start = Date.now() + let state = validate.initial() - state = validate.appendNew(state, null, keys, msg1, Date.now() + 3000) - state = validate.appendNew(state, null, keys, msg2, Date.now() + 2000) - state = validate.appendNew(state, null, keys, msg3, Date.now() + 1000) + state = validate.appendNew(state, null, keys, msg1, start + 3000) + state = validate.appendNew(state, null, keys, msg2, start + 2000) + state = validate.appendNew(state, null, keys, msg3, start + 1000) const authorQuery = { type: 'EQUAL', From 457bcc0b379baeea154276fbc2b23d9c1a5a1509 Mon Sep 17 00:00:00 2001 From: Anders Rune Jensen Date: Fri, 19 Feb 2021 08:38:12 +0100 Subject: [PATCH 5/7] Fix timestamp discontinuity tests --- test/query.js | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/test/query.js b/test/query.js index 34ea0b3..ba61832 100644 --- a/test/query.js +++ b/test/query.js @@ -726,17 +726,20 @@ prepareAndRunTest('Timestamp discontinuity', dir, (t, db, raf) => { }, } - addMsg(state.queue[0].value, raf, (err, m1) => { - addMsg(state.queue[1].value, raf, (err, m2) => { - addMsg(state.queue[2].value, raf, (err, m3) => { - db.all(authorQuery, 0, false, false, (err, results) => { - t.equal(results.length, 3) - t.equal(results[0].value.content.text, '1st', '1st ok') - t.equal(results[1].value.content.text, '2nd', '2nd ok') - t.equal(results[2].value.content.text, '3rd', '3rd ok') - t.end() + // we need to wait for the declared timestamps to win over arrival + setTimeout(() => { + addMsg(state.queue[0].value, raf, (err, m1) => { + addMsg(state.queue[1].value, raf, (err, m2) => { + addMsg(state.queue[2].value, raf, (err, m3) => { + db.all(authorQuery, 0, false, false, (err, results) => { + t.equal(results.length, 3) + t.equal(results[0].value.content.text, '3rd', '3rd ok') + t.equal(results[1].value.content.text, '2nd', '2nd ok') + t.equal(results[2].value.content.text, '1st', '1st ok') + t.end() + }) }) }) }) - }) + }, 3000) }) From a1b5dfb4f96ecbde7be306b6b1007d296ab247c3 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Sat, 20 Feb 2021 14:23:57 +0200 Subject: [PATCH 6/7] tweak usage of fastpriorityqueue --- index.js | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/index.js b/index.js index 97b4cf4..d068e8c 100644 --- a/index.js +++ b/index.js @@ -1012,8 +1012,7 @@ module.exports = function (log, indexesPath) { function sortedByTimestamp(bitset, descending) { updateCacheWithLog() const order = descending ? 'descending' : 'ascending' - if (sortedCache[order].has(bitset)) - return sortedCache[order].get(bitset).clone() + if (sortedCache[order].has(bitset)) return sortedCache[order].get(bitset) const fpq = new FastPriorityQueue( descending ? compareDescending : compareAscending ) @@ -1023,7 +1022,8 @@ module.exports = function (log, indexesPath) { timestamp: indexes['timestamp'].tarr[seq], }) }) - sortedCache[order].set(bitset, fpq.clone()) + fpq.trim() + sortedCache[order].set(bitset, fpq) return fpq } @@ -1037,15 +1037,18 @@ module.exports = function (log, indexesPath) { ) { seq = seq || 0 - const sorted = sortedByTimestamp(bitset, descending) + let sorted = sortedByTimestamp(bitset, descending) const resultSize = sorted.size - const sliced = [] - for (var i = 0; i < seq; ++i) sorted.poll() - - while (!sorted.isEmpty()) { - sliced.push(sorted.poll()) - if (limit !== null && sliced.length == limit) break + let sliced + if (seq === 0 && limit === 1) { + sliced = [sorted.peek()] + } else { + if (seq > 0) { + sorted = sorted.clone() + sorted.removeMany(() => true, seq) + } + sliced = sorted.kSmallest(limit || Infinity) } push( From f5fc6144f701a1ce4da473f99919ed4e8602577a Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Sat, 20 Feb 2021 22:19:27 +0200 Subject: [PATCH 7/7] add benchmark for paginate with big pageSize --- benchmark/index.js | 48 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/benchmark/index.js b/benchmark/index.js index 415848d..5041dc8 100644 --- a/benchmark/index.js +++ b/benchmark/index.js @@ -334,7 +334,10 @@ test('load two indexes concurrently', (t) => { }) }) -test('paginate one huge index', (t) => { +test('paginate big index with small pageSize', (t) => { + const TOTAL = 20000 + const PAGESIZE = 5 + const NUMPAGES = TOTAL / PAGESIZE db.onReady(() => { const start = Date.now() let i = 0 @@ -342,10 +345,10 @@ test('paginate one huge index', (t) => { query( fromDB(db), and(equal(seekType, 'post', { indexType: 'type' })), - paginate(5), + paginate(PAGESIZE), toPullStream() ), - pull.take(4000), + pull.take(NUMPAGES), pull.drain( (msgs) => { i++ @@ -353,11 +356,46 @@ test('paginate one huge index', (t) => { (err) => { if (err) t.fail(err) const duration = Date.now() - start - if (i !== 4000) t.fail('wrong number of pages read: ' + i) + if (i !== NUMPAGES) t.fail('wrong number of pages read: ' + i) t.pass(`duration: ${duration}ms`) fs.appendFileSync( reportPath, - `| Paginate 1 big index | ${duration}ms |\n` + `| Paginate ${TOTAL} msgs with pageSize=${PAGESIZE} | ${duration}ms |\n` + ) + t.end() + } + ) + ) + }) +}) + +test('paginate big index with big pageSize', (t) => { + const TOTAL = 20000 + const PAGESIZE = 500 + const NUMPAGES = TOTAL / PAGESIZE + db.onReady(() => { + const start = Date.now() + let i = 0 + pull( + query( + fromDB(db), + and(equal(seekType, 'post', { indexType: 'type' })), + paginate(PAGESIZE), + toPullStream() + ), + pull.take(NUMPAGES), + pull.drain( + (msgs) => { + i++ + }, + (err) => { + if (err) t.fail(err) + const duration = Date.now() - start + if (i !== NUMPAGES) t.fail('wrong number of pages read: ' + i) + t.pass(`duration: ${duration}ms`) + fs.appendFileSync( + reportPath, + `| Paginate ${TOTAL} msgs with pageSize=${PAGESIZE} | ${duration}ms |\n` ) t.end() }