Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster sort #123

Merged
merged 8 commits into from
Feb 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions benchmark/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -334,30 +334,68 @@ test('load two indexes concurrently', (t) => {
})
})

test('paginate one huge index', (t) => {
test('paginate big index with small pageSize', (t) => {
const TOTAL = 20000
const PAGESIZE = 5
const NUMPAGES = TOTAL / PAGESIZE
db.onReady(() => {
const start = Date.now()
let i = 0
pull(
query(
fromDB(db),
and(equal(seekType, 'post', { indexType: 'type' })),
paginate(5),
paginate(PAGESIZE),
toPullStream()
),
pull.take(4000),
pull.take(NUMPAGES),
pull.drain(
(msgs) => {
i++
},
(err) => {
if (err) t.fail(err)
const duration = Date.now() - start
if (i !== 4000) t.fail('wrong number of pages read: ' + i)
if (i !== NUMPAGES) t.fail('wrong number of pages read: ' + i)
t.pass(`duration: ${duration}ms`)
fs.appendFileSync(
reportPath,
`| Paginate 1 big index | ${duration}ms |\n`
`| Paginate ${TOTAL} msgs with pageSize=${PAGESIZE} | ${duration}ms |\n`
)
t.end()
}
)
)
})
})

test('paginate big index with big pageSize', (t) => {
const TOTAL = 20000
const PAGESIZE = 500
const NUMPAGES = TOTAL / PAGESIZE
db.onReady(() => {
const start = Date.now()
let i = 0
pull(
query(
fromDB(db),
and(equal(seekType, 'post', { indexType: 'type' })),
paginate(PAGESIZE),
toPullStream()
),
pull.take(NUMPAGES),
pull.drain(
(msgs) => {
i++
},
(err) => {
if (err) t.fail(err)
const duration = Date.now() - start
if (i !== NUMPAGES) t.fail('wrong number of pages read: ' + i)
t.pass(`duration: ${duration}ms`)
fs.appendFileSync(
reportPath,
`| Paginate ${TOTAL} msgs with pageSize=${PAGESIZE} | ${duration}ms |\n`
)
t.end()
}
Expand Down
51 changes: 33 additions & 18 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const pullAsync = require('pull-async')
const TypedFastBitSet = require('typedfastbitset')
const bsb = require('binary-search-bounds')
const multicb = require('multicb')
const FastPriorityQueue = require('fastpriorityqueue')
const debug = require('debug')('jitdb')
const debugQuery = debug.extend('query')
const Status = require('./status')
Expand Down Expand Up @@ -1000,22 +1001,30 @@ module.exports = function (log, indexesPath) {
})
}

function compareAscending(a, b) {
return b.timestamp > a.timestamp
}

function compareDescending(a, b) {
return a.timestamp > b.timestamp
}

function sortedByTimestamp(bitset, descending) {
updateCacheWithLog()
const order = descending ? 'descending' : 'ascending'
if (sortedCache[order].has(bitset)) return sortedCache[order].get(bitset)
const timestamped = bitset.array().map((seq) => {
return {
const fpq = new FastPriorityQueue(
descending ? compareDescending : compareAscending
)
bitset.array().forEach((seq) => {
fpq.add({
seq,
timestamp: indexes['timestamp'].tarr[seq],
}
})
const sorted = timestamped.sort((a, b) => {
if (descending) return b.timestamp - a.timestamp
else return a.timestamp - b.timestamp
})
})
sortedCache[order].set(bitset, sorted)
return sorted
fpq.trim()
sortedCache[order].set(bitset, fpq)
return fpq
}

function getMessagesFromBitsetSlice(
Expand All @@ -1028,13 +1037,19 @@ module.exports = function (log, indexesPath) {
) {
seq = seq || 0

const sorted = sortedByTimestamp(bitset, descending)
const sliced =
limit != null
? sorted.slice(seq, seq + limit)
: seq > 0
? sorted.slice(seq)
: sorted
let sorted = sortedByTimestamp(bitset, descending)
const resultSize = sorted.size

let sliced
if (seq === 0 && limit === 1) {
sliced = [sorted.peek()]
} else {
if (seq > 0) {
sorted = sorted.clone()
sorted.removeMany(() => true, seq)
}
sliced = sorted.kSmallest(limit || Infinity)
}

push(
push.values(sliced),
Expand All @@ -1046,15 +1061,15 @@ module.exports = function (log, indexesPath) {
push.collect((err, results) => {
cb(err, {
results: results,
total: sorted.length,
total: resultSize,
})
})
)
}

function countBitsetSlice(bitset, seq, descending) {
if (!seq) return bitset.size()
else return sortedByTimestamp(bitset, descending).slice(seq).length
else return bitset.size() - seq
}

function paginate(operation, seq, limit, descending, onlyOffset, cb) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"binary-search-bounds": "^2.0.4",
"bipf": "^1.4.0",
"debug": "^4.2.0",
"fastpriorityqueue": "^0.6.3",
"idb-kv-store": "^4.5.0",
"jsesc": "^3.0.2",
"mkdirp": "^1.0.4",
Expand Down
24 changes: 23 additions & 1 deletion test/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,29 @@ prepareAndRunTest('count operator toCallback', dir, (t, db, raf) => {
})
})

prepareAndRunTest('count with seq operator toCallback', dir, (t, db, raf) => {
const msg = { type: 'food', text: 'Lunch' }
let state = validate.initial()
state = validate.appendNew(state, null, alice, msg, Date.now())
state = validate.appendNew(state, null, bob, msg, Date.now() + 1)

addMsg(state.queue[0].value, raf, (e1, msg1) => {
addMsg(state.queue[1].value, raf, (e2, msg2) => {
query(
fromDB(db),
and(slowEqual('value.content.type', 'food')),
startFrom(1),
count(),
toCallback((err, total) => {
t.error(err, 'no error')
t.equal(total, 1)
t.end()
})
)
})
})
})

prepareAndRunTest('count operator toPullStream', dir, (t, db, raf) => {
const msg = { type: 'drink', text: 'Juice' }
let state = validate.initial()
Expand All @@ -631,7 +654,6 @@ prepareAndRunTest('count operator toPullStream', dir, (t, db, raf) => {
),
pull.collect((err, results) => {
t.error(err, 'no error')
console.log(results)
t.equal(results.length, 1)
t.equal(results[0], 2)
t.end()
Expand Down
31 changes: 18 additions & 13 deletions test/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,12 @@ prepareAndRunTest('Timestamp discontinuity', dir, (t, db, raf) => {
const msg2 = { type: 'post', text: '2nd' }
const msg3 = { type: 'post', text: '3rd' }

const start = Date.now()

let state = validate.initial()
state = validate.appendNew(state, null, keys, msg1, Date.now() + 3000)
state = validate.appendNew(state, null, keys, msg2, Date.now() + 2000)
state = validate.appendNew(state, null, keys, msg3, Date.now() + 1000)
state = validate.appendNew(state, null, keys, msg1, start + 3000)
state = validate.appendNew(state, null, keys, msg2, start + 2000)
state = validate.appendNew(state, null, keys, msg3, start + 1000)

const authorQuery = {
type: 'EQUAL',
Expand All @@ -724,17 +726,20 @@ prepareAndRunTest('Timestamp discontinuity', dir, (t, db, raf) => {
},
}

addMsg(state.queue[0].value, raf, (err, m1) => {
addMsg(state.queue[1].value, raf, (err, m2) => {
addMsg(state.queue[2].value, raf, (err, m3) => {
db.all(authorQuery, 0, false, false, (err, results) => {
t.equal(results.length, 3)
t.equal(results[0].value.content.text, '1st', '1st ok')
t.equal(results[1].value.content.text, '2nd', '2nd ok')
t.equal(results[2].value.content.text, '3rd', '3rd ok')
t.end()
// we need to wait for the declared timestamps to win over arrival
setTimeout(() => {
addMsg(state.queue[0].value, raf, (err, m1) => {
addMsg(state.queue[1].value, raf, (err, m2) => {
addMsg(state.queue[2].value, raf, (err, m3) => {
db.all(authorQuery, 0, false, false, (err, results) => {
t.equal(results.length, 3)
t.equal(results[0].value.content.text, '3rd', '3rd ok')
t.equal(results[1].value.content.text, '2nd', '2nd ok')
t.equal(results[2].value.content.text, '1st', '1st ok')
t.end()
})
})
})
})
})
}, 3000)
})