Skip to content

Commit

Permalink
Use stream instead of exponential backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
Powersource committed Oct 23, 2023
1 parent 05b4edf commit 9d10c47
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 31 deletions.
81 changes: 55 additions & 26 deletions lib/epochs.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// SPDX-License-Identifier: LGPL-3.0-only

const { promisify: p } = require('util')
const { backOff } = require('exponential-backoff')
const { fromMessageSigil } = require('ssb-uri2')
const pull = require('pull-stream')
const pullDefer = require('pull-defer')
Expand All @@ -13,7 +12,14 @@ const OverwriteFields = require('@tangle/overwrite-fields')
const clarify = require('clarify-error')
const Butt64 = require('butt64')
const isCanonicalBase64 = require('is-canonical-base64')
const { where, and, type, live, toPullStream } = require('ssb-db2/operators')
const {
where,
and,
type,
live,
isDecrypted,
toPullStream,
} = require('ssb-db2/operators')
const {
validator: {
group: {
Expand All @@ -29,6 +35,7 @@ const isSubsetOf = require('set.prototype.issubsetof')

const { groupRecp } = require('./operators')
const getTangleUpdates = require('./tangles/get-tangle-updates')
const pullMany = require('pull-many')

const msgPattern = toPattern(new Butt64('ssb:message/[a-zA-Z0-9-]+/', null, 32))
const feedPattern = toPattern(new Butt64('ssb:feed/[a-zA-Z0-9-]+/', null, 32))
Expand Down Expand Up @@ -512,21 +519,45 @@ function epochNodeStream(ssb, groupId, opts = {}) {

return deferredSource
}
function getRepeat(ssb, key) {
return backOff(
() =>
new Promise((res, rej) => {
ssb.db.get(key, (err, rootVal) => {
if (err) {
return rej(err)
} else {
return res(rootVal)
}
})
}),
{ numOfAttempts: 5 }

function getRootVal(ssb, msgId, cb) {
console.log('looking for msgId', msgId)
pull(
pullMany([
ssb.db.query(
// TODO: optimize with an equal() op or something
where(isDecrypted('box2')),
live({ old: true }),
toPullStream()
),
ssb.db.reindexed(),
]),
//pull.through((msg) => console.log('about to filter', msg)),
pull.filter((msg) => fromMessageSigil(msg.key) === msgId),
pull.take(1),
pull.drain(
(msg) => cb(null, msg.value),
(err) => {
if (err) cb(Error('Failed getting root msg async', { cause: err }))
}
)
)

//return backOff(
// () =>
// new Promise((res, rej) => {
// ssb.db.get(msgId, (err, rootVal) => {
// if (err) {
// return rej(err)
// } else {
// return res(rootVal)
// }
// })
// }),
// { numOfAttempts: 5 }
//)
}

function getGroupInit(ssb, groupId, cb) {
pull(
ssb.box2.getGroupInfoUpdates(groupId),
Expand All @@ -536,20 +567,18 @@ function getGroupInit(ssb, groupId, cb) {
if (!info) return cb(new Error('Unknown group'))

// Fetch the tangle root
// Repeating since sometimes the group info comes in very quick, before the root msg has had time to get put into the db
// This is based on a live stream since sometimes the group info comes in very quick, before the root msg has had time to get put into the db
// and sometimes it might take ages (we haven't gotten that feed yet)
getRepeat(ssb, info.root)
.then((rootVal) => {
if (!isInitRoot(rootVal))
// prettier-ignore
return cb(clarify(new Error(isInitRoot.string), 'Malformed group/init root message'))
getRootVal(ssb, info.root, (err, rootVal) => {
// prettier-ignore
if (err) return cb(clarify(err, 'Failed to load group root with id ' + info.root))

cb(null, { key: info.root, value: rootVal })
})
.catch((err) => {
if (!isInitRoot(rootVal))
// prettier-ignore
if (err) return cb(clarify(err, 'Failed to load group root with id ' + info.root))
})
return cb(clarify(new Error(isInitRoot.string), 'Malformed group/init root message'))

cb(null, { key: info.root, value: rootVal })
})
},
(err) => {
// prettier-ignore
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
"debug": "^4.3.4",
"envelope-js": "^1.3.2",
"envelope-spec": "^1.1.1",
"exponential-backoff": "^3.1.1",
"is-canonical-base64": "^1.1.1",
"jitdb": "^7.0.7",
"lodash.chunk": "^4.2.0",
Expand Down
8 changes: 4 additions & 4 deletions test/lib/epochs.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ test('lib/epochs (getEpochs, getMembers)', async (t) => {
async function sync(label) {
return run(`(sync ${label})`, replicate(peers), { isTest: false })
}
t.teardown(() => peers.forEach((peer) => peer.close(true)))
t.teardown(() => Promise.all(peers.map((peer) => p(peer.close)(true))))

const [aliceId, bobId, oscarId] = await getRootIds(peers)
await run(
Expand Down Expand Up @@ -153,7 +153,7 @@ test('lib/epochs (getMissingMembers)', async (t) => {
{ isTest: false }
)
}
t.teardown(() => peers.forEach((peer) => peer.close(true)))
t.teardown(() => Promise.all(peers.map((peer) => p(peer.close)(true))))

await run(
'start tribes',
Expand Down Expand Up @@ -260,7 +260,7 @@ test('lib/epochs (getPreferredEpoch - 4.4. same membership)', async (t) => {
Server({ name: 'bob' }),
Server({ name: 'oscar' }),
]
t.teardown(() => peers.forEach((peer) => peer.close(true)))
t.teardown(() => Promise.all(peers.map((peer) => p(peer.close)(true))))

const [alice, bob, oscar] = peers
const [bobId, oscarId] = await getRootIds([bob, oscar])
Expand Down Expand Up @@ -381,7 +381,7 @@ test('lib/epochs (getPreferredEpoch - 4.5. subset membership)', async (t) => {
Server({ name: 'carol' }),
Server({ name: 'oscar' }),
]
t.teardown(() => peers.forEach((peer) => peer.close(true)))
t.teardown(() => Promise.all(peers.map((peer) => p(peer.close)(true))))

const [alice, bob, carol, oscar] = peers
const [bobId, carolId, oscarId] = await getRootIds([bob, carol, oscar])
Expand Down

0 comments on commit 9d10c47

Please sign in to comment.