From e7a235800c5a6fb27137b30df314dbb8cc0ddab2 Mon Sep 17 00:00:00 2001 From: Sean Zellmer Date: Wed, 14 Apr 2021 22:35:56 -0500 Subject: [PATCH 1/8] feat: Add removeFeed(name|key) --- index.js | 38 ++++++++++++++++++ test/basic.js | 106 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+) diff --git a/index.js b/index.js index 5d2236a..bc33bbc 100644 --- a/index.js +++ b/index.js @@ -92,6 +92,44 @@ Multifeed.prototype._addFeed = function (feed, name) { this._forwardLiveFeedAnnouncements(feed, name) } +Multifeed.prototype.removeFeed = function (nameOrKey, cb) { + if (typeof cb !== 'function') cb = function noop () {} + + var self = this + + var feed = null + var name = null + var key = null + + if (nameOrKey in self._feeds) { + name = nameOrKey + feed = self._feeds[name] + key = feed.key.toString('hex') + } else { + key = nameOrKey + feed = self._feedKeyToFeed[key] + name = Object.keys(self._feeds).find(key => self._feeds[key] === feed) + } + + delete self._feeds[name] + delete self._feedKeyToFeed[key] + + // Remove from mux offering + self._streams.forEach((mux) => { + var idx = mux._localOffer.indexOf(key) + if (idx !== -1) { + mux._localOffer.splice(idx, 1) + } + }) + + self.writerLock(function (release) { + feed.destroyStorage(function (err) { + if (err) return self.emit('error', err) + release(cb) + }) + }) +} + Multifeed.prototype.ready = function (cb) { this._ready(cb) } diff --git a/test/basic.js b/test/basic.js index ac27a57..428d016 100644 --- a/test/basic.js +++ b/test/basic.js @@ -291,6 +291,112 @@ test('close after double-open', function (t) { } }) +test('remove feed w/ name', function (t) { + t.plan(6) + + var m1 = multifeed(ram, { valueEncoding: 'json' }) + var m2 = multifeed(ram, { valueEncoding: 'json' }) + + m1.writer(function (err) { + t.error(err) + m2.writer(function (err) { + t.error(err) + var r = m1.replicate(true) + r.pipe(m2.replicate(false)).pipe(r) + .once('end', remove) + }) + }) + + function remove () { + t.equals(m1.feeds().length, 2) + var feeds = m1.feeds() + var idx = feeds.length - 1 + m1.removeFeed(idx, function (err) { + t.error(err) + check() + }) + } + + function check () { + t.equals(m1.feeds().length, 1) + t.equals(m2.feeds().length, 2) + } +}) + +test('remove feed w/ key', function (t) { + t.plan(6) + + var m1 = multifeed(ram, { valueEncoding: 'json' }) + var m2 = multifeed(ram, { valueEncoding: 'json' }) + + m1.writer(function (err) { + t.error(err) + m2.writer(function (err) { + t.error(err) + var r = m1.replicate(true) + r.pipe(m2.replicate(false)).pipe(r) + .once('end', remove) + }) + }) + + function remove () { + t.equals(m1.feeds().length, 2) + var feeds = m1.feeds() + var feed = feeds[feeds.length - 1] + var key = feed.key.toString('hex') + m1.removeFeed(key, function (err) { + t.error(err) + check() + }) + } + + function check () { + t.equals(m1.feeds().length, 1) + t.equals(m2.feeds().length, 2) + } +}) + +test('remove feed updates mux\'s knownFeeds()', function (t) { + t.plan(8) + + var m1 = multifeed(ram, { valueEncoding: 'json' }) + var m2 = multifeed(ram, { valueEncoding: 'json' }) + + m1.writer(function (err) { + t.error(err) + m2.writer(function (err) { + t.error(err) + var r = m1.replicate(true, { live: true }) + r.pipe(m2.replicate(false, { live: true })).pipe(r) + setTimeout(remove, 1000) + }) + }) + + function remove () { + var feeds = m1.feeds() + var idx = feeds.length - 1 + var feed = feeds[idx] + var mux = m1._streams[0] + var key = feed.key.toString('hex') + + // Force feed key to be available in mux localOffer. This would be true of + // future replications. + mux.offerFeeds([key]) + + // Check it exists before removing + t.equals(m1.feeds().length, 2) + t.notEquals(mux._localOffer.indexOf(key), -1) + + m1.removeFeed(idx, function (err) { + t.error(err) + // Check it was removed from only m1 + t.equals(mux._localOffer.indexOf(key), -1) + t.equals(m1.feeds().length, 1) + t.equals(m2.feeds().length, 2) + }) + } +}) + test('can provide custom encryption key', function (t) { t.plan(2) From 0166b38141306e2610c15af36fc60cf50ebc6798 Mon Sep 17 00:00:00 2001 From: Sean Zellmer Date: Sat, 17 Apr 2021 21:20:57 -0500 Subject: [PATCH 2/8] fix: Add storage index for persistent tracking of feeds w/ feed removal With the addition of `removeFeed()`, the previous paradigm of always numbering storage directories starting at `0` and incrementing with each feed no longer works as this would have stopped when it encountered a deleted storage's number. Instead now all storage directories are stored in an index file called `dirs` in a storage dir called `index`. The index is updated whenever a feed is added or removed. This index is agnostic to the name of the dirs except for the assumption that there is no commas in the name as the dir names are stored with commas delimiting the names. Directory names are still incremented numbers. To support the fact that they can be removed and so the number of feeds tracked does not match the biggest number used for the directory names, the max number used is tracked and all new feeds are given a number based on the max. In the encryption key storage test in the regression tests, the number of resources was incremented to account for the addition of the index storage. --- index.js | 89 ++++++++++++++++++++++++++++++++++++---------- test/regression.js | 2 +- 2 files changed, 72 insertions(+), 19 deletions(-) diff --git a/index.js b/index.js index bc33bbc..5a46853 100644 --- a/index.js +++ b/index.js @@ -39,6 +39,8 @@ function Multifeed (storage, opts) { // random-access-storage wrapper that wraps all hypercores in a directory // structures. (dir/0, dir/1, ...) + this._dirs = {} + this._max_dir = -1 this._storage = function (dir) { return function (name) { var s = storage @@ -114,6 +116,13 @@ Multifeed.prototype.removeFeed = function (nameOrKey, cb) { delete self._feeds[name] delete self._feedKeyToFeed[key] + // Remove from dirs index + Object.keys(self._dirs).forEach((dir) => { + if (self._dirs[dir] === feed) { + delete self._dirs[dir] + } + }) + // Remove from mux offering self._streams.forEach((mux) => { var idx = mux._localOffer.indexOf(key) @@ -125,7 +134,7 @@ Multifeed.prototype.removeFeed = function (nameOrKey, cb) { self.writerLock(function (release) { feed.destroyStorage(function (err) { if (err) return self.emit('error', err) - release(cb) + self._updateStorageIndex(function () { release(cb) }) }) }) } @@ -171,32 +180,51 @@ function _close (cb) { }) } +Multifeed.prototype._updateStorageIndex = function (cb) { + if (typeof cb !== 'function') cb = function noop () {} + + var self = this + + var dirs = Object.keys(self._dirs).join(',') + self._max_dir = Math.max.apply(null, Object.keys(self._dirs).map(Number)) + + var st = self._storage('index')('dirs') + writeStringToStorage(dirs, st, cb) +} + Multifeed.prototype._loadFeeds = function (cb) { var self = this - // Hypercores are stored starting at 0 and incrementing by 1. A failed read - // at position 0 implies non-existance of the hypercore. + // Hypercores are stored via an index file in numbers directories. If no index + // is found, the structure is assumed to be legacy which starts at 0 and + // increments by 1. A failed read at position 0 implies non-existance of the + // hypercore and if legacy means the end of loading. + var doneOnErr = true + var nextDir = function (dir) { return dir + 1 } + var pending = 1 - function next (n) { - var storage = self._storage('' + n) + function next (dir) { + if (!dir) return done() + + var storage = self._storage('' + dir) var st = storage('key') st.read(0, 4, function (err) { - if (err) return done() // means there are no more feeds to read - debug(self._id + ' [INIT] loading feed #' + n) + if (err && doneOnErr) return done() // means there are no more feeds to read + debug(self._id + ' [INIT] loading feed #' + dir) pending++ var feed = self._hypercore(storage, self._opts) - process.nextTick(next, n + 1) + process.nextTick(next, nextDir(dir)) feed.ready(function () { readStringFromStorage(storage('localname'), function (err, name) { if (!err && name) { self._addFeed(feed, name) } else { - self._addFeed(feed, String(n)) + self._addFeed(feed, String(dir)) } st.close(function (err) { if (err) return done(err) - debug(self._id + ' [INIT] loaded feed #' + n) + debug(self._id + ' [INIT] loaded feed #' + dir) done() }) }) @@ -212,7 +240,27 @@ Multifeed.prototype._loadFeeds = function (cb) { if (!--pending) cb() } - next(0) + var indexSt = self._storage('index')('names') + + readStringFromStorage(indexSt, function (err, dirs) { + if (err) { + next(0) + } else { + doneOnErr = false + dirs = dirs.split(',') + + nextDir = function (dir) { + var idx = dirs.indexOf(dir) + if (idx < dirs.length - 1) { + return dirs[idx + 1] + } else { + return '' + } + } + + next(dirs[0]) + } + }) } Multifeed.prototype.writer = function (name, opts, cb) { @@ -239,10 +287,10 @@ Multifeed.prototype.writer = function (name, opts, cb) { debug(self._id + ' [WRITER] creating new writer: ' + name) self.writerLock(function (release) { - var len = Object.keys(self._feeds).length - var storage = self._storage('' + len) + var dir = self._max_dir + 1 + var storage = self._storage('' + dir) - var idx = name || String(len) + var idx = name || String(dir) var nameStore = storage('localname') writeStringToStorage(idx, nameStore, function (err) { @@ -262,9 +310,12 @@ Multifeed.prototype.writer = function (name, opts, cb) { feed.ready(function () { self._addFeed(feed, String(idx)) - release(function () { - if (err) cb(err) - else cb(null, feed, idx) + self._dirs[dir] = feed + self._updateStorageIndex(function (err) { + release(function () { + if (err) cb(err) + else cb(null, feed, idx) + }) }) }) }) @@ -370,7 +421,7 @@ Multifeed.prototype.replicate = function (isInitiator, opts) { return !Number.isNaN(parseInt(key, 16)) && key.length === 64 }) - var numFeeds = Object.keys(self._feeds).length + var numFeeds = self._max_dir + 1 var keyId = numFeeds filtered.forEach(function (key) { var feeds = values(self._feeds).filter(function (feed) { @@ -393,6 +444,8 @@ Multifeed.prototype.replicate = function (isInitiator, opts) { } feed.ready(function () { self._addFeed(feed, myKey) + self._dirs[myKey] = feed + self._updateStorageIndex() keyId++ debug(self._id + ' [REPLICATION] succeeded in creating new local hypercore, key=' + key.toString('hex')) if (!--pending) cb() diff --git a/test/regression.js b/test/regression.js index 18d5dd9..64618a1 100644 --- a/test/regression.js +++ b/test/regression.js @@ -356,7 +356,7 @@ test('regression: ensure encryption key is not written to disk', function (t) { t.error(err) fs.readdir(storage, function (err, res) { t.error(err) - t.equals(res.length, 1) + t.equals(res.length, 2) t.equals(res[0], '0') }) }) From 33d68f8ac2ea816ca58d5a942a77a209f3ec89a0 Mon Sep 17 00:00:00 2001 From: Sean Zellmer Date: Thu, 22 Apr 2021 22:22:58 -0500 Subject: [PATCH 3/8] fix: Fix index file typo & test loading persistent storage w/ removal --- index.js | 2 +- test/basic.js | 43 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index 5a46853..7142271 100644 --- a/index.js +++ b/index.js @@ -240,7 +240,7 @@ Multifeed.prototype._loadFeeds = function (cb) { if (!--pending) cb() } - var indexSt = self._storage('index')('names') + var indexSt = self._storage('index')('dirs') readStringFromStorage(indexSt, function (err, dirs) { if (err) { diff --git a/test/basic.js b/test/basic.js index 428d016..44796de 100644 --- a/test/basic.js +++ b/test/basic.js @@ -247,6 +247,49 @@ test('get localfeed by name across disk loads', function (t) { }) }) +test('load all feeds from disk after removing one', function (t) { + t.plan(14) + + var storage = tmp() + var multi = multifeed(storage, { valueEncoding: 'json' }) + + multi.writer('foo', function (err, wFoo) { + t.error(err) + t.ok(wFoo.key) + + multi.writer('bar', function (err, wBar) { + t.error(err) + t.ok(wBar.key) + wBar.append('a') + + multi.writer('baz', function (err, wBaz) { + t.error(err) + t.ok(wBaz.key) + + multi.removeFeed('bar', function () { + t.deepEquals(multi.feeds().length, 2, 'baz successfully deleted') + + multi.close(function () { + var multi2 = multifeed(storage, { valueEncoding: 'json' }) + multi2.writer('foo', function (err, wFoo2) { + t.error(err) + t.ok(wFoo2.key) + t.deepEquals(multi2.feeds().length, 2) + t.deepEquals(wFoo2.key, wFoo.key, 'keys match') + + multi2.writer('baz', function (err, wBaz2) { + t.error(err) + t.ok(wBaz2.key) + t.deepEquals(wBaz2.key, wBaz.key, 'keys match') + }) + }) + }) + }) + }) + }) + }) +}) + test('close', function (t) { var storage = tmp() var multi = multifeed(storage, { valueEncoding: 'json' }) From 6399d01f4ef2fdafca7eb8635abe6ff55fb4a4ea Mon Sep 17 00:00:00 2001 From: Sean Zellmer Date: Thu, 22 Apr 2021 22:24:53 -0500 Subject: [PATCH 4/8] fix: Clear storage when writing w/ `writeStringStorage()` As the previous comment noted, writing `writeStringStorage()` didn't account for writing to a storage with existing data that was longer than the current data. This wasn't a problem until the storage index was implemented. If feeds are removed, then it is very likely that the current data is shorter than previous data. To fix this the storage size is retrieved. If it errors while `stat`ing the file, then its assumed to be non-existent and we just go ahead and write to the file. --- index.js | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/index.js b/index.js index 7142271..72d23fd 100644 --- a/index.js +++ b/index.js @@ -469,12 +469,25 @@ Multifeed.prototype._forwardLiveFeedAnnouncements = function (feed, name) { }) } -// TODO: what if the new data is shorter than the old data? things will break! function writeStringToStorage (string, storage, cb) { - var buf = Buffer.from(string, 'utf8') - storage.write(0, buf, function (err) { - storage.close(function (err2) { - cb(err || err2) + function writeBuffer () { + var buf = Buffer.from(string, 'utf8') + storage.write(0, buf, function (err) { + storage.close(function (err2) { + cb(err || err2) + }) + }) + } + + // Check if data already exists + storage.stat(function (err, stat) { + if (err) return writeBuffer() + + var len = stat.size + storage.del(0, len, function (err) { + if (err) return cb(err) + + writeBuffer() }) }) } From 0d503c2d19cf5ccc5174555d7c68ce13d8e9809a Mon Sep 17 00:00:00 2001 From: Sean Zellmer Date: Thu, 22 Apr 2021 22:42:02 -0500 Subject: [PATCH 5/8] fix: Add feed's storage `dir` to cache when loading This will not update the index file during the loading process, but will allow future updates to the index to include loaded feeds. --- index.js | 1 + 1 file changed, 1 insertion(+) diff --git a/index.js b/index.js index 72d23fd..6e895de 100644 --- a/index.js +++ b/index.js @@ -222,6 +222,7 @@ Multifeed.prototype._loadFeeds = function (cb) { } else { self._addFeed(feed, String(dir)) } + self._dirs[dir] = feed st.close(function (err) { if (err) return done(err) debug(self._id + ' [INIT] loaded feed #' + dir) From 3c33a1d962e51a18af98cd3bd466a5cfa121148e Mon Sep 17 00:00:00 2001 From: Sean Zellmer Date: Wed, 25 Aug 2021 18:52:20 -0500 Subject: [PATCH 6/8] fix: Correct check for end of feeds being loaded by checking if number The initial `!dir` check in `next()` would bail when loading index `0` which is the first feed in the legacy storage structure. All subsequent feeds would be lost. --- index.js | 2 +- test/basic.js | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index 6e895de..d9b8ccb 100644 --- a/index.js +++ b/index.js @@ -204,7 +204,7 @@ Multifeed.prototype._loadFeeds = function (cb) { var pending = 1 function next (dir) { - if (!dir) return done() + if (!dir && typeof dir !== 'number') return done() var storage = self._storage('' + dir) var st = storage('key') diff --git a/test/basic.js b/test/basic.js index 44796de..0d2ffb9 100644 --- a/test/basic.js +++ b/test/basic.js @@ -290,6 +290,43 @@ test('load all feeds from disk after removing one', function (t) { }) }) +test('load all feeds from legacy storage', function (t) { + t.plan(9) + + var storage = tmp() + var multi = multifeed(storage, { valueEncoding: 'json' }) + + // Create a legacy storage structure by creating two feeds & deleting the + // index. + multi.writer(function (err, wFoo) { + t.error(err, 'no error getting writer') + t.ok(wFoo.key, 'got a key') + + // Add second feed to highlight that feeds aren't loaded + // This is necessary because `writer()` implicitly loads a feed by + // "creating" it. + multi.writer('foo', function (err, wBar) { + t.error(err, 'no error getting second feed') + t.ok(wBar.key, 'got a key') + t.deepEquals(multi.feeds().length, 2, 'now have 2 feeds') + + // Remove index to replicate legacy storage configuration + multi._storage('index')('dirs').destroy(function () { + multi.close(function () { + var multi2 = multifeed(storage, { valueEncoding: 'json' }) + multi2.writer(function (err, wFoo2) { + t.error(err) + t.ok(wFoo2.key) + console.log('multi2._feeds', multi2._feeds) + t.deepEquals(multi2.feeds().length, 2) + t.deepEquals(wFoo2.key, wFoo.key, 'keys match') + }) + }) + }) + }) + }) +}) + test('close', function (t) { var storage = tmp() var multi = multifeed(storage, { valueEncoding: 'json' }) From a2004f8469f1e2d3e5e8239b93a1ac523722cab3 Mon Sep 17 00:00:00 2001 From: Sean Zellmer Date: Sun, 29 Aug 2021 23:38:28 -0500 Subject: [PATCH 7/8] fix: Update `_max_dir` when loading feeds It was assumed previously that the max could be loaded whenever a feed was added or removed, but replicated feeds didn't refresh the max dir index before creating a new feed with the default starting index `0`. --- index.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/index.js b/index.js index d9b8ccb..4ae33f7 100644 --- a/index.js +++ b/index.js @@ -250,6 +250,9 @@ Multifeed.prototype._loadFeeds = function (cb) { doneOnErr = false dirs = dirs.split(',') + // Update max dir on load + self._max_dir = Math.max.apply(null, dirs.map(Number)) + nextDir = function (dir) { var idx = dirs.indexOf(dir) if (idx < dirs.length - 1) { From 1e2ffc267bd54dc6bdab7810f5410e5d55e4ad60 Mon Sep 17 00:00:00 2001 From: Sean Zellmer Date: Sat, 26 Mar 2022 18:01:59 -0500 Subject: [PATCH 8/8] fix: Account for blank index file when loading feeds An empty string was being converted to `0` and caused the numeric feed names to start at `1` instead of `0`. --- index.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/index.js b/index.js index 4ae33f7..595a95c 100644 --- a/index.js +++ b/index.js @@ -248,10 +248,10 @@ Multifeed.prototype._loadFeeds = function (cb) { next(0) } else { doneOnErr = false - dirs = dirs.split(',') + dirs = dirs ? dirs.split(',') : [] // Update max dir on load - self._max_dir = Math.max.apply(null, dirs.map(Number)) + self._max_dir = Math.max.apply(null, dirs.map(Number).concat(self._max_dir)) nextDir = function (dir) { var idx = dirs.indexOf(dir)