From a51dc5cba224281c7002dc82239402e521b64a46 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 5 Oct 2023 21:47:39 +0100 Subject: [PATCH 1/8] feat: add capabilities.getAll() (#326) --- src/capabilities.js | 38 ++++++++++++++++++++++ test-e2e/capabilities.js | 68 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/src/capabilities.js b/src/capabilities.js index 64d8345f9..05d8b4485 100644 --- a/src/capabilities.js +++ b/src/capabilities.js @@ -199,6 +199,44 @@ export class Capabilities { return capabilities } + /** + * Get capabilities of all devices in the project. For your own device, if you + * have not yet synced your own role record, the "no role" capabilties is + * returned. The project creator will have `CREATOR_CAPABILITIES` unless a + * different role has been assigned. + * + * @returns {Promise>} Map of deviceId to Capability + */ + async getAll() { + const roles = await this.#dataType.getMany() + let projectCreatorDeviceId + try { + projectCreatorDeviceId = await this.#coreOwnership.getOwner( + this.#projectCreatorAuthCoreId + ) + } catch (e) { + // Not found, we don't know who the project creator is so we can't include + // them in the returned map + } + /** @type {Record} */ + const capabilities = {} + for (const role of roles) { + const deviceId = role.docId + if (!isKnownRoleId(role.roleId)) continue + capabilities[deviceId] = DEFAULT_CAPABILITIES[role.roleId] + } + const includesSelf = Boolean(capabilities[this.#ownDeviceId]) + if (!includesSelf) { + const isProjectCreator = this.#ownDeviceId === projectCreatorDeviceId + if (isProjectCreator) { + capabilities[this.#ownDeviceId] = CREATOR_CAPABILITIES + } else { + capabilities[this.#ownDeviceId] = NO_ROLE_CAPABILITIES + } + } + return capabilities + } + /** * Assign a role to the specified `deviceId`. Devices without an assigned role * are unable to sync, except the project creator that defaults to having all diff --git a/test-e2e/capabilities.js b/test-e2e/capabilities.js index 3192c85e8..7019857fa 100644 --- a/test-e2e/capabilities.js +++ b/test-e2e/capabilities.js @@ -7,6 +7,8 @@ import { DEFAULT_CAPABILITIES, CREATOR_CAPABILITIES, MEMBER_ROLE_ID, + COORDINATOR_ROLE_ID, + NO_ROLE_CAPABILITIES, } from '../src/capabilities.js' import { randomBytes } from 'crypto' @@ -72,3 +74,69 @@ test('New device without capabilities', async (t) => { await project[kCapabilities].assignRole(deviceId, MEMBER_ROLE_ID) }, 'Trying to assign a role without capabilities throws an error') }) + +test('getMany() - on invitor device', async (t) => { + const rootKey = KeyManager.generateRootKey() + const km = new KeyManager(rootKey) + const creatorDeviceId = km.getIdentityKeypair().publicKey.toString('hex') + const manager = new MapeoManager({ + rootKey, + dbFolder: ':memory:', + coreStorage: () => new RAM(), + }) + + const projectId = await manager.createProject() + const project = await manager.getProject(projectId) + const ownCapabilities = await project.$getOwnCapabilities() + + t.alike( + ownCapabilities, + CREATOR_CAPABILITIES, + 'Project creator has creator capabilities' + ) + + const deviceId1 = randomBytes(32).toString('hex') + const deviceId2 = randomBytes(32).toString('hex') + await project[kCapabilities].assignRole(deviceId1, MEMBER_ROLE_ID) + await project[kCapabilities].assignRole(deviceId2, COORDINATOR_ROLE_ID) + + const expected = { + [deviceId1]: DEFAULT_CAPABILITIES[MEMBER_ROLE_ID], + [deviceId2]: DEFAULT_CAPABILITIES[COORDINATOR_ROLE_ID], + [creatorDeviceId]: CREATOR_CAPABILITIES, + } + + t.alike( + await project[kCapabilities].getAll(), + expected, + 'expected capabilities' + ) +}) + +test('getMany() - on newly invited device before sync', async (t) => { + const rootKey = KeyManager.generateRootKey() + const km = new KeyManager(rootKey) + const deviceId = km.getIdentityKeypair().publicKey.toString('hex') + const manager = new MapeoManager({ + rootKey, + dbFolder: ':memory:', + coreStorage: () => new RAM(), + }) + + const projectId = await manager.addProject({ + projectKey: randomBytes(32), + encryptionKeys: { auth: randomBytes(32) }, + }) + const project = await manager.getProject(projectId) + await project.ready() + + const expected = { + [deviceId]: NO_ROLE_CAPABILITIES, + } + + t.alike( + await project[kCapabilities].getAll(), + expected, + 'expected capabilities' + ) +}) From 37dbd22e06c7779f0f4f694e8a7e39556a54ce16 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Mon, 9 Oct 2023 15:01:56 +0100 Subject: [PATCH 2/8] chore: run CI on PRs against any branch (#330) Currently CI only runs for PRs against `main`, which means it does not run for stacked PRs. This changes the CI workflow to run on all PRs --- .github/workflows/node.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/node.yml b/.github/workflows/node.yml index e469eb7e0..cf8d7fcd9 100644 --- a/.github/workflows/node.yml +++ b/.github/workflows/node.yml @@ -7,7 +7,6 @@ on: push: branches: [main] pull_request: - branches: [main] jobs: build: From 05b0830d118951fb55ab6b742b242fc0057eecfd Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Mon, 9 Oct 2023 18:00:32 +0100 Subject: [PATCH 3/8] feat: Add NamespaceSyncState (#313) * chore: emit peer-have messages don't persist Fixes #309, removes responsibility from CoreManager to track peer-have messages - will be handled by SyncState class * feat: Add NamespaceSyncState This combines all core sync states for a namespace. It listens to CoreManager for new cores and pre-have messages. * feat: add namespace to peer pre-have messages * fix param name * WIP tests * add tests * don't use eventEmitter Since these are internal modules and we don't attach a "listener" other than in the constructor, switching to a pattern that passes an 'onUpdate' constructor param, that avoids needing to track when event listeners are removed --- package-lock.json | 16 +++ package.json | 2 + src/sync/core-sync-state.js | 42 +++----- src/sync/namespace-sync-state.js | 102 ++++++++++++++++++ tests/helpers/core-manager.js | 5 + tests/helpers/replication-state.js | 17 +-- tests/sync/core-sync-state.js | 15 ++- tests/sync/namespace-sync-state.js | 167 +++++++++++++++++++++++++++++ 8 files changed, 326 insertions(+), 40 deletions(-) create mode 100644 src/sync/namespace-sync-state.js create mode 100644 tests/sync/namespace-sync-state.js diff --git a/package-lock.json b/package-lock.json index c01a0cdf0..a7630653a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -45,6 +45,7 @@ "sodium-universal": "^4.0.0", "start-stop-state-machine": "^1.2.0", "sub-encoder": "^2.1.1", + "throttle-debounce": "^5.0.0", "tiny-typed-emitter": "^2.1.0", "varint": "^6.0.0", "z32": "^1.0.1" @@ -59,6 +60,7 @@ "@types/node": "^18.16.3", "@types/sinonjs__fake-timers": "^8.1.2", "@types/streamx": "^2.9.1", + "@types/throttle-debounce": "^5.0.0", "@types/varint": "^6.0.1", "bitfield": "^4.1.0", "brittle": "^3.2.1", @@ -1126,6 +1128,12 @@ "@types/node": "*" } }, + "node_modules/@types/throttle-debounce": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/@types/throttle-debounce/-/throttle-debounce-5.0.0.tgz", + "integrity": "sha512-Pb7k35iCGFcGPECoNE4DYp3Oyf2xcTd3FbFQxXUI9hEYKUl6YX+KLf7HrBmgVcD05nl50LIH6i+80js4iYmWbw==", + "dev": true + }, "node_modules/@types/varint": { "version": "6.0.1", "resolved": "https://registry.npmjs.org/@types/varint/-/varint-6.0.1.tgz", @@ -7735,6 +7743,14 @@ "real-require": "^0.2.0" } }, + "node_modules/throttle-debounce": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/throttle-debounce/-/throttle-debounce-5.0.0.tgz", + "integrity": "sha512-2iQTSgkkc1Zyk0MeVrt/3BvuOXYPl/R8Z0U2xxo9rjwNciaHDG3R+Lm6dh4EeUci49DanvBnuqI6jshoQQRGEg==", + "engines": { + "node": ">=12.22" + } + }, "node_modules/thunky": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/thunky/-/thunky-1.1.0.tgz", diff --git a/package.json b/package.json index 5448adcd4..6028cd339 100644 --- a/package.json +++ b/package.json @@ -76,6 +76,7 @@ "@types/node": "^18.16.3", "@types/sinonjs__fake-timers": "^8.1.2", "@types/streamx": "^2.9.1", + "@types/throttle-debounce": "^5.0.0", "@types/varint": "^6.0.1", "bitfield": "^4.1.0", "brittle": "^3.2.1", @@ -142,6 +143,7 @@ "sodium-universal": "^4.0.0", "start-stop-state-machine": "^1.2.0", "sub-encoder": "^2.1.1", + "throttle-debounce": "^5.0.0", "tiny-typed-emitter": "^2.1.0", "varint": "^6.0.0", "z32": "^1.0.1" diff --git a/src/sync/core-sync-state.js b/src/sync/core-sync-state.js index 2f5a100b9..9ca1df587 100644 --- a/src/sync/core-sync-state.js +++ b/src/sync/core-sync-state.js @@ -1,4 +1,3 @@ -import { TypedEmitter } from 'tiny-typed-emitter' import { keyToId } from '../utils.js' import RemoteBitfield, { BITS_PER_PAGE, @@ -32,10 +31,6 @@ import RemoteBitfield, { * @property {PeerSimpleState} localState local state * @property {Record} remoteStates map of state of all known peers */ -/** - * @typedef {object} CoreSyncEvents - * @property {() => void} update - */ /** * Track sync state for a core identified by `discoveryId`. Can start tracking @@ -43,9 +38,9 @@ import RemoteBitfield, { * received over the project creator core. * * Because deriving the state is expensive (it iterates through the bitfields of - * all peers), this is designed to be pull-based: an `update` event signals that - * the state is updated, but does not pass the state. The consumer can "pull" - * the state when it wants it via `coreSyncState.getState()`. + * all peers), this is designed to be pull-based: the onUpdate event signals + * that the state is updated, but does not pass the state. The consumer can + * "pull" the state when it wants it via `coreSyncState.getState()`. * * Each peer (including the local peer) has a state of: * 1. `have` - number of blocks the peer has locally @@ -53,25 +48,28 @@ import RemoteBitfield, { * 3. `wanted` - number of blocks the peer has that at least one peer wants * 4. `missing` - number of blocks the peer wants but no peer has * - * @extends {TypedEmitter} */ -export class CoreSyncState extends TypedEmitter { +export class CoreSyncState { /** @type {import('hypercore')<'binary', Buffer>} */ #core /** @type {InternalState['remoteStates']} */ #remoteStates = new Map() /** @type {InternalState['localState']} */ #localState = new PeerState() - #discoveryId /** @type {DerivedState | null} */ #cachedState = null + #update /** - * @param {string} discoveryId Discovery ID for the core that this is representing + * @param {() => void} onUpdate Called when a state update is available (via getState()) */ - constructor(discoveryId) { - super() - this.#discoveryId = discoveryId + constructor(onUpdate) { + // Called whenever the state changes, so we clear the cache because next + // call to getState() will need to re-derive the state + this.#update = () => { + this.#cachedState = null + process.nextTick(onUpdate) + } } /** @type {() => DerivedState} */ @@ -84,15 +82,6 @@ export class CoreSyncState extends TypedEmitter { }) } - /** - * Called whenever the state changes, so we clear the cache because next call - * to getState() will need to re-derive the state - */ - #update() { - this.#cachedState = null - this.emit('update') - } - /** * Attach a core. The sync state can be initialized without a core instance, * because we could receive peer want and have states via extension messages @@ -101,11 +90,6 @@ export class CoreSyncState extends TypedEmitter { * @param {import('hypercore')<'binary', Buffer>} core */ attachCore(core) { - // @ts-ignore - we know discoveryKey exists here - const discoveryId = keyToId(core.discoveryKey) - if (discoveryId !== this.#discoveryId) { - throw new Error('discoveryId does not match') - } if (this.#core) return this.#core = core diff --git a/src/sync/namespace-sync-state.js b/src/sync/namespace-sync-state.js new file mode 100644 index 000000000..169c42303 --- /dev/null +++ b/src/sync/namespace-sync-state.js @@ -0,0 +1,102 @@ +import { CoreSyncState } from './core-sync-state.js' +import { discoveryKey } from 'hypercore-crypto' + +/** + * @typedef {object} PeerSyncState + * @property {number} have + * @property {number} want + * @property {number} wanted + * @property {number} missing + */ + +/** + * @typedef {object} SyncState + * @property {PeerSyncState} localState + */ + +/** + * @template {import('../core-manager/index.js').Namespace} [TNamespace=import('../core-manager/index.js').Namespace] + */ +export class NamespaceSyncState { + /** @type {Map} */ + #coreStates = new Map() + #handleUpdate + #namespace + + /** + * @param {object} opts + * @param {TNamespace} opts.namespace + * @param {import('../core-manager/index.js').CoreManager} opts.coreManager + * @param {() => void} opts.onUpdate Called when a state update is available (via getState()) + */ + constructor({ namespace, coreManager, onUpdate }) { + this.#namespace = namespace + this.#handleUpdate = onUpdate + + for (const { core, key } of coreManager.getCores(namespace)) { + this.#addCore(core, key) + } + + coreManager.on('add-core', ({ core, namespace, key }) => { + if (namespace !== this.#namespace) return + this.#addCore(core, key) + }) + + coreManager.on('peer-have', (namespace, msg) => { + if (namespace !== this.#namespace) return + this.#insertPreHaves(msg) + }) + } + + get namespace() { + return this.#namespace + } + + /** @returns {SyncState} */ + getState() { + const state = { + localState: { have: 0, want: 0, wanted: 0, missing: 0 }, + } + for (const crs of this.#coreStates.values()) { + const { localState } = crs.getState() + state.localState.have += localState.have + state.localState.want += localState.want + state.localState.wanted += localState.wanted + state.localState.missing += localState.missing + } + return state + } + + /** + * @param {import('hypercore')<"binary", Buffer>} core + * @param {Buffer} coreKey + */ + #addCore(core, coreKey) { + const discoveryId = discoveryKey(coreKey).toString('hex') + this.#getCoreState(discoveryId).attachCore(core) + } + + /** + * @param {{ + * peerId: string, + * start: number, + * coreDiscoveryId: string, + * bitfield: Uint32Array + * }} opts + */ + #insertPreHaves({ peerId, start, coreDiscoveryId, bitfield }) { + this.#getCoreState(coreDiscoveryId).insertPreHaves(peerId, start, bitfield) + } + + /** + * @param {string} discoveryId + */ + #getCoreState(discoveryId) { + let coreState = this.#coreStates.get(discoveryId) + if (!coreState) { + coreState = new CoreSyncState(this.#handleUpdate) + this.#coreStates.set(discoveryId, coreState) + } + return coreState + } +} diff --git a/tests/helpers/core-manager.js b/tests/helpers/core-manager.js index be439481b..0269c6688 100644 --- a/tests/helpers/core-manager.js +++ b/tests/helpers/core-manager.js @@ -6,6 +6,11 @@ import { KeyManager } from '@mapeo/crypto' import RAM from 'random-access-memory' import NoiseSecretStream from '@hyperswarm/secret-stream' +/** + * + * @param {Partial[0]> & { rootKey?: Buffer }} param0 + * @returns + */ export function createCoreManager({ rootKey = randomBytes(16), projectKey = randomBytes(32), diff --git a/tests/helpers/replication-state.js b/tests/helpers/replication-state.js index c71cebe51..19f1e430f 100644 --- a/tests/helpers/replication-state.js +++ b/tests/helpers/replication-state.js @@ -1,7 +1,6 @@ import NoiseSecretStream from '@hyperswarm/secret-stream' import { truncateId } from '../../src/utils.js' -import { getKeys } from './core-manager.js' export function logState(syncState, name) { let message = `${name ? name + ' ' : ''}${ @@ -37,13 +36,19 @@ export async function download( { start = 0, end = -1 } = {} ) { const writer = coreManager.getWriterCore(namespace) - const keys = getKeys(coreManager, namespace) - - for (const key of keys) { + const donePromises = [] + for (const { core, key } of coreManager.getCores(namespace)) { if (key.equals(writer.core.key)) continue - const core = coreManager.getCoreByKey(key) - await core.download({ start, end, ifAvailable: true }).done() + donePromises.push(core.download({ start, end, ifAvailable: true }).done()) } + if (end !== -1) return Promise.all(donePromises) + return new Promise(() => { + coreManager.on('add-core', (coreRecord) => { + console.log('add-core') + if (coreRecord.namespace !== namespace) return + coreRecord.core.download({ start, end, ifAvailable: true }).done() + }) + }) } export async function downloadCore( diff --git a/tests/sync/core-sync-state.js b/tests/sync/core-sync-state.js index b40673dd6..f579473f5 100644 --- a/tests/sync/core-sync-state.js +++ b/tests/sync/core-sync-state.js @@ -14,6 +14,7 @@ import { createCore } from '../helpers/index.js' // import { setTimeout } from 'timers/promises' import { once } from 'node:events' import pTimeout from 'p-timeout' +import { EventEmitter } from 'node:events' /** * @type {Array<{ @@ -208,7 +209,8 @@ test('CoreReplicationState', async (t) => { for (const { state, expected, message } of scenarios) { const localCore = await createCore() await localCore.ready() - const crs = new CoreSyncState(localCore.discoveryKey.toString('hex')) + const emitter = new EventEmitter() + const crs = new CoreSyncState(() => emitter.emit('update')) crs.attachCore(localCore) const blocks = new Array(state.length).fill('block') await localCore.append(blocks) @@ -249,7 +251,7 @@ test('CoreReplicationState', async (t) => { connected: connectedState.get(peerId), } } - await updateWithTimeout(crs, 100) + await updateWithTimeout(emitter, 100) t.alike( crs.getState(), { ...expected, remoteStates: expectedRemoteStates }, @@ -383,11 +385,14 @@ function setPeerWants(state, peerId, bits) { /** * Wait for update event with a timeout - * @param {CoreSyncState} state + * @param {EventEmitter} updateEmitter * @param {number} milliseconds */ -async function updateWithTimeout(state, milliseconds) { - return pTimeout(once(state, 'update'), { milliseconds, message: false }) +async function updateWithTimeout(updateEmitter, milliseconds) { + return pTimeout(once(updateEmitter, 'update'), { + milliseconds, + message: false, + }) } /** diff --git a/tests/sync/namespace-sync-state.js b/tests/sync/namespace-sync-state.js new file mode 100644 index 000000000..091f33fe6 --- /dev/null +++ b/tests/sync/namespace-sync-state.js @@ -0,0 +1,167 @@ +//@ts-check +import test from 'brittle' +import { KeyManager } from '@mapeo/crypto' +import { NamespaceSyncState } from '../../src/sync/namespace-sync-state.js' +import { + createCoreManager, + waitForCores, + getKeys, +} from '../helpers/core-manager.js' +import { replicate } from '../helpers/replication-state.js' + +test('sync cores in a namespace', async function (t) { + t.plan(2) + const projectKeyPair = KeyManager.generateProjectKeypair() + + const cm1 = createCoreManager({ + projectKey: projectKeyPair.publicKey, + projectSecretKey: projectKeyPair.secretKey, + }) + const cm2 = createCoreManager({ projectKey: projectKeyPair.publicKey }) + + replicate(cm1, cm2) + + await Promise.all([ + waitForCores(cm1, getKeys(cm2, 'auth')), + waitForCores(cm2, getKeys(cm1, 'auth')), + ]) + + let syncState1Synced = false + let syncState2Synced = false + + const syncState1 = new NamespaceSyncState({ + coreManager: cm1, + namespace: 'auth', + onUpdate: () => { + const { localState } = syncState1.getState() + if ( + localState.want === 0 && + localState.wanted === 0 && + localState.have === 30 && + localState.missing === 10 && + !syncState1Synced + ) { + t.pass('syncState1 is synced') + syncState1Synced = true + } + }, + }) + + const syncState2 = new NamespaceSyncState({ + coreManager: cm2, + namespace: 'auth', + onUpdate: () => { + const { localState } = syncState2.getState() + if ( + localState.want === 0 && + localState.wanted === 0 && + localState.have === 30 && + localState.missing === 10 && + !syncState2Synced + ) { + t.pass('syncState2 is synced') + syncState2Synced = true + } + }, + }) + + const cm1Keys = getKeys(cm1, 'auth') + const cm2Keys = getKeys(cm2, 'auth') + + const writer1 = cm1.getWriterCore('auth') + await writer1.core.append(Array(20).fill('block')) + await writer1.core.clear(0, 10) + + const writer2 = cm2.getWriterCore('auth') + await writer2.core.append(Array(20).fill('block')) + + for (const key of cm1Keys) { + if (key.equals(writer1.core.key)) continue + const core = cm1.getCoreByKey(key) + core.download({ start: 0, end: -1 }) + } + + for (const key of cm2Keys) { + if (key.equals(writer2.core.key)) continue + const core = cm2.getCoreByKey(key) + core.download({ start: 0, end: -1 }) + } +}) + +test('replicate with updating data', async function (t) { + t.plan(2) + const fillLength = 5000 + + const projectKeyPair = KeyManager.generateProjectKeypair() + + const cm1 = createCoreManager({ + projectKey: projectKeyPair.publicKey, + projectSecretKey: projectKeyPair.secretKey, + }) + const cm2 = createCoreManager({ projectKey: projectKeyPair.publicKey }) + + const writer1 = cm1.getWriterCore('auth') + for (let i = 0; i < fillLength; i = i + 100) { + const blocks = new Array(100).fill(null).map((b, i) => `block ${i}`) + writer1.core.append(blocks) + } + + const writer2 = cm2.getWriterCore('auth') + for (let i = 0; i < fillLength; i = i + 100) { + const blocks = new Array(100).fill(null).map((b, i) => `block ${i}`) + writer2.core.append(blocks) + } + + replicate(cm1, cm2) + + await Promise.all([ + waitForCores(cm1, getKeys(cm2, 'auth')), + waitForCores(cm2, getKeys(cm1, 'auth')), + ]) + + let syncState1AlreadyDone = false + let syncState2AlreadyDone = false + + const syncState1 = new NamespaceSyncState({ + coreManager: cm1, + namespace: 'auth', + onUpdate: () => { + const { localState } = syncState1.getState() + const synced = + localState.wanted === 0 && localState.have === fillLength * 2 + if (synced && !syncState1AlreadyDone) { + t.ok(synced, 'syncState1 is synced') + syncState1AlreadyDone = true + } + }, + }) + + const syncState2 = new NamespaceSyncState({ + coreManager: cm2, + namespace: 'auth', + onUpdate: () => { + const { localState } = syncState2.getState() + const synced = + localState.wanted === 0 && localState.have === fillLength * 2 + if (synced && !syncState2AlreadyDone) { + t.ok(synced, 'syncState2 is synced') + syncState2AlreadyDone = true + } + }, + }) + + const cm1Keys = getKeys(cm1, 'auth') + const cm2Keys = getKeys(cm2, 'auth') + + for (const key of cm1Keys) { + if (key.equals(writer1.core.key)) continue + const core = cm1.getCoreByKey(key) + core.download({ live: true, start: 0, end: -1 }) + } + + for (const key of cm2Keys) { + if (key.equals(writer2.core.key)) continue + const core = cm2.getCoreByKey(key) + core.download({ live: true, start: 0, end: -1 }) + } +}) From c9b135d2dafe6d6379aaa5b1305da51e5416197f Mon Sep 17 00:00:00 2001 From: tomasciccola <117094913+tomasciccola@users.noreply.github.com> Date: Mon, 9 Oct 2023 15:15:31 -0300 Subject: [PATCH 4/8] Feat: add DataStore.writeRaw method (#334) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add initial Implementation of `dataStore.writeRaw` * add `dataStore.readRaw` and test * remove `swap16` to avoid linting errors * change `readRaw` error to 'core not found' --------- Co-authored-by: Tomás Ciccola --- src/datastore/index.js | 22 ++++++++++++++++++++++ tests/datastore.js | 18 ++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/src/datastore/index.js b/src/datastore/index.js index 44b597c3e..4655c0c06 100644 --- a/src/datastore/index.js +++ b/src/datastore/index.js @@ -170,4 +170,26 @@ export class DataStore extends TypedEmitter { if (!block) throw new Error('Not Found') return decode(block, { coreDiscoveryKey, index }) } + + /** @param {Buffer} buf} */ + async writeRaw(buf) { + const { length } = await this.#writerCore.append(buf) + const index = length - 1 + const coreDiscoveryKey = this.#writerCore.discoveryKey + if (!coreDiscoveryKey) { + throw new Error('Writer core is not ready') + } + const versionId = getVersionId({ coreDiscoveryKey, index }) + return versionId + } + + /** @param {string} versionId */ + async readRaw(versionId) { + const { coreDiscoveryKey, index } = parseVersionId(versionId) + const core = this.#coreManager.getCoreByDiscoveryKey(coreDiscoveryKey) + if (!core) throw new Error('core not found') + const block = await core.get(index, { wait: false }) + if (!block) throw new Error('Not Found') + return block + } } diff --git a/tests/datastore.js b/tests/datastore.js index 8854faa65..ac1bb1b7b 100644 --- a/tests/datastore.js +++ b/tests/datastore.js @@ -60,6 +60,24 @@ test('read and write', async (t) => { ) }) +test('writeRaw and read', async (t) => { + const cm = createCoreManager() + const writerCore = cm.getWriterCore('config').core + await writerCore.ready() + const dataStore = new DataStore({ + coreManager: cm, + namespace: 'config', + batch: async () => { + await new Promise((res) => setTimeout(res, 10)) + }, + storage: () => new RAM(), + }) + const buf = Buffer.from('myblob') + const versionId = await dataStore.writeRaw(buf) + const expectedBuf = await dataStore.readRaw(versionId) + t.alike(buf, expectedBuf) +}) + test('index events', async (t) => { const cm = createCoreManager() const writerCore = cm.getWriterCore('data').core From 57fdedf2def93bf17d36c56fa525d59c857a0a92 Mon Sep 17 00:00:00 2001 From: Andrew Chou Date: Wed, 11 Oct 2023 11:24:23 -0400 Subject: [PATCH 5/8] fix: write own device info when creating and adding projects (#297) --- src/mapeo-manager.js | 27 +++- src/mapeo-project.js | 36 +++++ src/member-api.js | 75 +++++++-- test-e2e/device-info.js | 97 +++++++++++- test-e2e/members.js | 207 +++++++++++++++++++++++++ tests/member-api.js | 328 ---------------------------------------- 6 files changed, 428 insertions(+), 342 deletions(-) create mode 100644 test-e2e/members.js delete mode 100644 tests/member-api.js diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index a62d7df10..23842de7e 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -7,7 +7,7 @@ import { drizzle } from 'drizzle-orm/better-sqlite3' import { migrate } from 'drizzle-orm/better-sqlite3/migrator' import Hypercore from 'hypercore' import { IndexWriter } from './index-writer/index.js' -import { MapeoProject } from './mapeo-project.js' +import { MapeoProject, kSetOwnDeviceInfo } from './mapeo-project.js' import { localDeviceInfoTable, projectKeysTable, @@ -219,10 +219,16 @@ export class MapeoManager { // 5. Write project name and any other relevant metadata to project instance await project.$setProjectSettings(settings) + // 6. Write device info into project + const deviceInfo = await this.getDeviceInfo() + if (deviceInfo.name) { + await project[kSetOwnDeviceInfo]({ name: deviceInfo.name }) + } + // TODO: Close the project instance instead of keeping it around this.#activeProjects.set(projectPublicId, project) - // 6. Return project public id + // 7. Return project public id return projectPublicId } @@ -365,6 +371,14 @@ export class MapeoManager { projectInfo, }) + // 5. Write device info into project + const deviceInfo = await this.getDeviceInfo() + + if (deviceInfo.name) { + const project = await this.getProject(projectPublicId) + await project[kSetOwnDeviceInfo]({ name: deviceInfo.name }) + } + return projectPublicId } @@ -382,6 +396,15 @@ export class MapeoManager { set: values, }) .run() + + const listedProjects = await this.listProjects() + + await Promise.all( + listedProjects.map(async ({ projectId }) => { + const project = await this.getProject(projectId) + await project[kSetOwnDeviceInfo](deviceInfo) + }) + ) } /** diff --git a/src/mapeo-project.js b/src/mapeo-project.js index 19f570c06..feb4e1054 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -39,6 +39,7 @@ const CORESTORE_STORAGE_FOLDER_NAME = 'corestore' const INDEXER_STORAGE_FOLDER_NAME = 'indexer' export const kCoreOwnership = Symbol('coreOwnership') export const kCapabilities = Symbol('capabilities') +export const kSetOwnDeviceInfo = Symbol('kSetOwnDeviceInfo') export class MapeoProject { #projectId @@ -107,6 +108,7 @@ export class MapeoProject { storage: coreManagerStorage, sqlite, }) + const indexWriter = new IndexWriter({ tables: [ observationTable, @@ -114,6 +116,7 @@ export class MapeoProject { fieldTable, coreOwnershipTable, roleTable, + deviceInfoTable, ], sqlite, getWinner, @@ -225,7 +228,9 @@ export class MapeoProject { }) this.#memberApi = new MemberApi({ + deviceId: this.#deviceId, capabilities: this.#capabilities, + coreOwnership: this.#coreOwnership, // @ts-expect-error encryptionKeys, projectKey, @@ -273,6 +278,10 @@ export class MapeoProject { return this.#capabilities } + get deviceId() { + return this.#deviceId + } + /** * Resolves when hypercores have all loaded */ @@ -376,6 +385,33 @@ export class MapeoProject { async $getOwnCapabilities() { return this.#capabilities.getCapabilities(this.#deviceId) } + + /** + * @param {Pick} value + * @returns {Promise} + */ + async [kSetOwnDeviceInfo](value) { + const { deviceInfo } = this.#dataTypes + + const configCoreId = this.#coreManager + .getWriterCore('config') + .key.toString('hex') + + let existingDoc + try { + existingDoc = await deviceInfo.getByDocId(configCoreId) + } catch (err) { + return await deviceInfo[kCreateWithDocId](configCoreId, { + ...value, + schemaName: 'deviceInfo', + }) + } + + return deviceInfo.update(existingDoc.versionId, { + ...value, + schemaName: 'deviceInfo', + }) + } } /** diff --git a/src/member-api.js b/src/member-api.js index 83fabe6db..d79b22016 100644 --- a/src/member-api.js +++ b/src/member-api.js @@ -4,10 +4,12 @@ import { projectKeyToId } from './utils.js' /** @typedef {import('./datatype/index.js').DataType, typeof import('./schema/project.js').deviceInfoTable, "deviceInfo", import('@mapeo/schema').DeviceInfo, import('@mapeo/schema').DeviceInfoValue>} DeviceInfoDataType */ /** @typedef {import('./datatype/index.js').DataType, typeof import('./schema/client.js').projectSettingsTable, "projectSettings", import('@mapeo/schema').ProjectSettings, import('@mapeo/schema').ProjectSettingsValue>} ProjectDataType */ -/** @typedef {{ deviceId: string, name: import('@mapeo/schema').DeviceInfo['name'], capabilities: import('./capabilities.js').Capability }} MemberInfo */ +/** @typedef {{ deviceId: string, name?: import('@mapeo/schema').DeviceInfo['name'], capabilities: import('./capabilities.js').Capability }} MemberInfo */ export class MemberApi extends TypedEmitter { + #ownDeviceId #capabilities + #coreOwnership #encryptionKeys #projectKey #rpc @@ -15,7 +17,9 @@ export class MemberApi extends TypedEmitter { /** * @param {Object} opts + * @param {string} opts.deviceId public key of this device as hex string * @param {import('./capabilities.js').Capabilities} opts.capabilities + * @param {import('./core-ownership.js').CoreOwnership} opts.coreOwnership * @param {import('./generated/keys.js').EncryptionKeys} opts.encryptionKeys * @param {Buffer} opts.projectKey * @param {import('./rpc/index.js').MapeoRPC} opts.rpc @@ -23,9 +27,19 @@ export class MemberApi extends TypedEmitter { * @param {Pick} opts.dataTypes.deviceInfo * @param {Pick} opts.dataTypes.project */ - constructor({ capabilities, encryptionKeys, projectKey, rpc, dataTypes }) { + constructor({ + deviceId, + capabilities, + coreOwnership, + encryptionKeys, + projectKey, + rpc, + dataTypes, + }) { super() + this.#ownDeviceId = deviceId this.#capabilities = capabilities + this.#coreOwnership = coreOwnership this.#encryptionKeys = encryptionKeys this.#projectKey = projectKey this.#rpc = rpc @@ -65,24 +79,63 @@ export class MemberApi extends TypedEmitter { * @returns {Promise} */ async getById(deviceId) { - const { name } = await this.#dataTypes.deviceInfo.getByDocId(deviceId) const capabilities = await this.#capabilities.getCapabilities(deviceId) - return { deviceId, name, capabilities } + + /** @type {MemberInfo} */ + const result = { deviceId, capabilities } + + try { + const configCoreId = await this.#coreOwnership.getCoreId( + deviceId, + 'config' + ) + + const deviceInfo = await this.#dataTypes.deviceInfo.getByDocId( + configCoreId + ) + + result.name = deviceInfo.name + } catch (err) { + // Attempting to get someone else may throw because sync hasn't occurred or completed + // Only throw if attempting to get themself since the relevant information should be available + if (deviceId === this.#ownDeviceId) throw err + } + + return result } /** * @returns {Promise>} */ async getMany() { - const devices = await this.#dataTypes.deviceInfo.getMany() + const [allCapabilities, allDeviceInfo] = await Promise.all([ + this.#capabilities.getAll(), + this.#dataTypes.deviceInfo.getMany(), + ]) + return Promise.all( - devices.map(async ({ docId, name }) => { - const capabilities = await this.#capabilities.getCapabilities(docId) - return { - deviceId: docId, - name, - capabilities, + Object.entries(allCapabilities).map(async ([deviceId, capabilities]) => { + /** @type {MemberInfo} */ + const memberInfo = { deviceId, capabilities } + + try { + const configCoreId = await this.#coreOwnership.getCoreId( + deviceId, + 'config' + ) + + const deviceInfo = allDeviceInfo.find( + ({ docId }) => docId === configCoreId + ) + + memberInfo.name = deviceInfo?.name + } catch (err) { + // Attempting to get someone else may throw because sync hasn't occurred or completed + // Only throw if attempting to get themself since the relevant information should be available + if (deviceId === this.#ownDeviceId) throw err } + + return memberInfo }) ) } diff --git a/test-e2e/device-info.js b/test-e2e/device-info.js index a122e178a..3a0a7844a 100644 --- a/test-e2e/device-info.js +++ b/test-e2e/device-info.js @@ -1,8 +1,10 @@ import { test } from 'brittle' +import { randomBytes } from 'crypto' import { KeyManager } from '@mapeo/crypto' -import { MapeoManager } from '../src/mapeo-manager.js' import RAM from 'random-access-memory' +import { MapeoManager } from '../src/mapeo-manager.js' + test('write and read deviceInfo', async (t) => { const rootKey = KeyManager.generateRootKey() const manager = new MapeoManager({ @@ -19,3 +21,96 @@ test('write and read deviceInfo', async (t) => { const readInfo2 = await manager.getDeviceInfo() t.alike(readInfo2, info2) }) + +test('device info written to projects', async (t) => { + t.test('when creating project', async (st) => { + const manager = new MapeoManager({ + rootKey: KeyManager.generateRootKey(), + dbFolder: ':memory:', + coreStorage: () => new RAM(), + }) + + await manager.setDeviceInfo({ name: 'mapeo' }) + + const projectId = await manager.createProject() + const project = await manager.getProject(projectId) + + await project.ready() + + const me = await project.$member.getById(project.deviceId) + + st.is(me.deviceId, project.deviceId) + st.alike({ name: me.name }, { name: 'mapeo' }) + }) + + t.test('when adding project', async (st) => { + const manager = new MapeoManager({ + rootKey: KeyManager.generateRootKey(), + dbFolder: ':memory:', + coreStorage: () => new RAM(), + }) + + await manager.setDeviceInfo({ name: 'mapeo' }) + + const projectId = await manager.addProject({ + projectKey: randomBytes(32), + encryptionKeys: { auth: randomBytes(32) }, + }) + + const project = await manager.getProject(projectId) + + await project.ready() + + const me = await project.$member.getById(project.deviceId) + + st.alike({ name: me.name }, { name: 'mapeo' }) + }) + + t.test('after updating global device info', async (st) => { + const manager = new MapeoManager({ + rootKey: KeyManager.generateRootKey(), + dbFolder: ':memory:', + coreStorage: () => new RAM(), + }) + + await manager.setDeviceInfo({ name: 'before' }) + + const projectIds = await Promise.all([ + manager.createProject(), + manager.createProject(), + manager.createProject(), + ]) + + const projects = await Promise.all( + projectIds.map(async (projectId) => { + const project = await manager.getProject(projectId) + await project.ready() + return project + }) + ) + + { + const ownMemberInfos = await Promise.all( + projects.map((p) => p.$member.getById(p.deviceId)) + ) + + for (const info of ownMemberInfos) { + st.alike({ name: info.name }, { name: 'before' }) + } + } + + await manager.setDeviceInfo({ name: 'after' }) + + { + const ownMemberInfos = await Promise.all( + projects.map((p) => p.$member.getById(p.deviceId)) + ) + + for (const info of ownMemberInfos) { + st.alike({ name: info.name }, { name: 'after' }) + } + } + }) + + // TODO: Test closing project, changing name, and getting project to see if device info for project is updated +}) diff --git a/test-e2e/members.js b/test-e2e/members.js new file mode 100644 index 000000000..cbef3fef8 --- /dev/null +++ b/test-e2e/members.js @@ -0,0 +1,207 @@ +import { test } from 'brittle' +import RAM from 'random-access-memory' +import { KeyManager } from '@mapeo/crypto' +import pDefer from 'p-defer' +import { randomBytes } from 'crypto' + +import { MapeoManager, kRPC } from '../src/mapeo-manager.js' +import { + CREATOR_CAPABILITIES, + DEFAULT_CAPABILITIES, + MEMBER_ROLE_ID, + NO_ROLE_CAPABILITIES, +} from '../src/capabilities.js' +import { replicate } from '../tests/helpers/rpc.js' + +test('getting yourself after creating project', async (t) => { + const { manager } = setup() + + await manager.setDeviceInfo({ name: 'mapeo' }) + const project = await manager.getProject(await manager.createProject()) + await project.ready() + + const me = await project.$member.getById(project.deviceId) + + t.alike( + me, + { + deviceId: project.deviceId, + name: 'mapeo', + capabilities: CREATOR_CAPABILITIES, + }, + 'has expected member info with creator capabilities' + ) + + const members = await project.$member.getMany() + + t.is(members.length, 1) + t.alike( + members[0], + { + deviceId: project.deviceId, + name: 'mapeo', + capabilities: CREATOR_CAPABILITIES, + }, + 'has expected member info with creator capabilities' + ) +}) + +test('getting yourself after being invited to project (but not yet synced)', async (t) => { + const { manager } = setup() + + await manager.setDeviceInfo({ name: 'mapeo' }) + const project = await manager.getProject( + await manager.addProject({ + projectKey: randomBytes(32), + encryptionKeys: { auth: randomBytes(32) }, + }) + ) + await project.ready() + + const me = await project.$member.getById(project.deviceId) + + t.alike( + me, + { + deviceId: project.deviceId, + name: 'mapeo', + capabilities: NO_ROLE_CAPABILITIES, + }, + 'has expected member info with no role capabilities' + ) + + const members = await project.$member.getMany() + + t.is(members.length, 1) + t.alike( + members[0], + { + deviceId: project.deviceId, + name: 'mapeo', + capabilities: NO_ROLE_CAPABILITIES, + }, + 'has expected member info with no role capabilities' + ) +}) + +test('getting invited member after invite rejected', async (t) => { + const { manager, simulateMemberInvite } = setup() + + await manager.setDeviceInfo({ name: 'mapeo' }) + const project = await manager.getProject(await manager.createProject()) + await project.ready() + + const invitedDeviceId = await simulateMemberInvite(project, 'reject', { + deviceInfo: { name: 'member' }, + roleId: MEMBER_ROLE_ID, + }) + + await t.exception( + () => project.$member.getById(invitedDeviceId), + 'invited member cannot be retrieved' + ) + + const members = await project.$member.getMany() + + t.is(members.length, 1) + t.absent( + members.find((m) => m.deviceId === invitedDeviceId), + 'invited member not found' + ) +}) + +test('getting invited member after invite accepted', async (t) => { + const { manager, simulateMemberInvite } = setup() + + await manager.setDeviceInfo({ name: 'mapeo' }) + const project = await manager.getProject(await manager.createProject()) + await project.ready() + + const invitedDeviceId = await simulateMemberInvite(project, 'accept', { + deviceInfo: { name: 'member' }, + roleId: MEMBER_ROLE_ID, + }) + + // Before syncing + { + const invitedMember = await project.$member.getById(invitedDeviceId) + + t.alike( + invitedMember, + { + deviceId: invitedDeviceId, + capabilities: DEFAULT_CAPABILITIES[MEMBER_ROLE_ID], + }, + 'has expected member info with member capabilities' + ) + } + + { + const members = await project.$member.getMany() + + t.is(members.length, 2) + + const invitedMember = members.find((m) => m.deviceId === invitedDeviceId) + + t.alike( + invitedMember, + { + deviceId: invitedDeviceId, + capabilities: DEFAULT_CAPABILITIES[MEMBER_ROLE_ID], + }, + 'has expected member info with member capabilities' + ) + } + + // TODO: Test that device info of invited member can be read from invitor after syncing +}) + +function setup() { + const manager = new MapeoManager({ + rootKey: KeyManager.generateRootKey(), + dbFolder: ':memory:', + coreStorage: () => new RAM(), + }) + + /** + * + * @param {import('../src/mapeo-project.js').MapeoProject} project + * @param {'accept' | 'reject'} respondWith + * @param {{ deviceInfo: import('../src/generated/rpc.js').DeviceInfo, roleId: import('../src/capabilities.js').RoleId }} mocked + * + */ + async function simulateMemberInvite( + project, + respondWith, + { deviceInfo, roleId } + ) { + /** @type {import('p-defer').DeferredPromise} */ + const deferred = pDefer() + + const otherManager = new MapeoManager({ + rootKey: KeyManager.generateRootKey(), + dbFolder: ':memory:', + coreStorage: () => new RAM(), + }) + + await otherManager.setDeviceInfo(deviceInfo) + + otherManager.invite.on('invite-received', ({ projectId }) => { + otherManager.invite[respondWith](projectId).catch(deferred.reject) + }) + + manager[kRPC].on('peers', (peers) => { + const deviceId = peers[0].id + project.$member + .invite(deviceId, { roleId }) + .then(() => deferred.resolve(deviceId)) + .catch(deferred.reject) + }) + + replicate(manager[kRPC], otherManager[kRPC]) + + return deferred.promise + } + + return { manager, simulateMemberInvite } +} diff --git a/tests/member-api.js b/tests/member-api.js deleted file mode 100644 index 16a655488..000000000 --- a/tests/member-api.js +++ /dev/null @@ -1,328 +0,0 @@ -import { test } from 'brittle' -import { randomBytes } from 'crypto' -import { KeyManager } from '@mapeo/crypto' -import { MapeoRPC } from '../src/rpc/index.js' -import { MemberApi } from '../src/member-api.js' -import { InviteResponse_Decision } from '../src/generated/rpc.js' -import { - BLOCKED_ROLE_ID, - DEFAULT_CAPABILITIES, - MEMBER_ROLE_ID, -} from '../src/capabilities.js' -import { replicate } from './helpers/rpc.js' - -test('invite() sends expected project-related details', async (t) => { - t.plan(4) - - const { projectKey, encryptionKeys, rpc: r1, capabilities } = setup() - - const projectInfo = createProjectRecord({ name: 'mapeo' }) - const r2 = new MapeoRPC() - - const memberApi = new MemberApi({ - capabilities, - encryptionKeys, - projectKey, - rpc: r1, - dataTypes: { - project: { - async getByDocId() { - return projectInfo - }, - }, - }, - }) - r1.on('peers', async (peers) => { - const response = await memberApi.invite(peers[0].id, { - roleId: MEMBER_ROLE_ID, - }) - - t.is(response, InviteResponse_Decision.ACCEPT) - }) - - r2.on('invite', (peerId, invite) => { - t.alike(invite.projectKey, projectKey) - t.alike(invite.encryptionKeys, encryptionKeys) - t.alike(invite.projectInfo?.name, projectInfo.name) - - r2.inviteResponse(peerId, { - projectKey: invite.projectKey, - decision: InviteResponse_Decision.ACCEPT, - }) - }) - - replicate(r1, r2) -}) - -test('invite() assigns role to invited device after invite accepted', async (t) => { - t.plan(4) - - const { projectKey, encryptionKeys, rpc: r1 } = setup() - - const r2 = new MapeoRPC() - - const expectedRoleId = MEMBER_ROLE_ID - let expectedDeviceId = null - - // We're only testing that this gets called with the expected arguments - const capabilities = { - async assignRole(deviceId, roleId) { - t.ok(expectedDeviceId) - t.is(deviceId, expectedDeviceId) - t.is(roleId, expectedRoleId) - }, - } - - const memberApi = new MemberApi({ - capabilities, - encryptionKeys, - projectKey, - rpc: r1, - dataTypes: { - project: { - async getByDocId() { - return createProjectRecord({ name: 'mapeo' }) - }, - }, - }, - }) - - r1.on('peers', async (peers) => { - expectedDeviceId = peers[0].id - - const response = await memberApi.invite(expectedDeviceId, { - roleId: expectedRoleId, - }) - - t.is(response, InviteResponse_Decision.ACCEPT) - }) - - r2.on('invite', (peerId, invite) => { - r2.inviteResponse(peerId, { - projectKey: invite.projectKey, - decision: InviteResponse_Decision.ACCEPT, - }) - }) - - replicate(r1, r2) -}) - -test('invite() does not assign role to invited device if invite is not accepted', async (t) => { - const nonAcceptInviteDecisions = Object.values( - InviteResponse_Decision - ).filter((d) => d !== InviteResponse_Decision.ACCEPT) - - for (const decision of nonAcceptInviteDecisions) { - t.test(decision, (st) => { - st.plan(1) - - const { projectKey, encryptionKeys, rpc: r1, capabilities } = setup() - - const r2 = new MapeoRPC() - - const capabilitiesSpy = { - ...capabilities, - // This should not be called at any point in this test - async assignRole() { - st.fail( - 'Attempted to assign role despite decision being non-acceptance' - ) - }, - } - - const memberApi = new MemberApi({ - capabilities: capabilitiesSpy, - encryptionKeys, - projectKey, - rpc: r1, - dataTypes: { - project: { - async getByDocId() { - return createProjectRecord({ name: 'mapeo' }) - }, - }, - }, - }) - - r1.on('peers', async (peers) => { - const response = await memberApi.invite(peers[0].id, { - roleId: MEMBER_ROLE_ID, - }) - - st.is(response, decision) - }) - - r2.on('invite', (peerId, invite) => { - r2.inviteResponse(peerId, { - projectKey: invite.projectKey, - decision, - }) - }) - - replicate(r1, r2) - }) - } -}) - -test('getById() works', async (t) => { - const { projectKey, encryptionKeys, rpc, capabilities } = setup() - - const deviceId = randomBytes(32).toString('hex') - - const deviceInfo = createDeviceInfoRecord({ deviceId, name: 'member' }) - - // Pre-populate data - await capabilities.assignRole(deviceId, MEMBER_ROLE_ID) - const deviceInfoRecords = [deviceInfo] - - const memberApi = new MemberApi({ - capabilities, - encryptionKeys, - projectKey, - rpc, - dataTypes: { - deviceInfo: { - async getByDocId(deviceId) { - const info = deviceInfoRecords.find(({ docId }) => docId === deviceId) - if (!info) throw new Error(`No record with ID ${deviceId}`) - return info - }, - }, - }, - }) - - const member = await memberApi.getById(deviceId) - - t.alike( - member, - { - deviceId, - name: deviceInfo.name, - capabilities: DEFAULT_CAPABILITIES[MEMBER_ROLE_ID], - }, - 'returns matching member' - ) - - await t.exception(async () => { - const randomDeviceId = randomBytes(32).toString('hex') - await memberApi.getById(randomDeviceId) - }, 'throws when no match') -}) - -test('getMany() works', async (t) => { - const { projectKey, encryptionKeys, rpc, capabilities } = setup() - - const deviceInfoRecords = [] - - const memberApi = new MemberApi({ - capabilities, - encryptionKeys, - projectKey, - rpc, - queries: { getProjectInfo: async () => {} }, - dataTypes: { - deviceInfo: { - async getMany() { - return deviceInfoRecords - }, - }, - }, - }) - - const initialMembers = await memberApi.getMany() - - t.is(initialMembers.length, 0, 'no initial members') - - // Pre-populate data - for (let i = 0; i < 3; i++) { - const deviceInfo = createDeviceInfoRecord({ name: `member${i + 1}` }) - deviceInfoRecords.push(deviceInfo) - await capabilities.assignRole(deviceInfo.docId, MEMBER_ROLE_ID) - } - - const members = await memberApi.getMany() - - t.is(members.length, 3) - - for (const member of members) { - t.alike(member.capabilities, DEFAULT_CAPABILITIES[MEMBER_ROLE_ID]) - - const deviceInfo = deviceInfoRecords.find( - ({ docId }) => docId === member.deviceId - ) - - t.ok(deviceInfo) - t.is(member.name, deviceInfo?.name) - } -}) - -function setup() { - const projectKey = KeyManager.generateProjectKeypair().publicKey - const encryptionKeys = { auth: randomBytes(32) } - const rpc = new MapeoRPC() - - /** @type {Map} */ - const memberRoles = new Map() - - /** @type {Pick} */ - const capabilities = { - async assignRole(deviceId, role) { - memberRoles.set(deviceId, role) - }, - async getCapabilities(deviceId) { - const roleId = memberRoles.get(deviceId) - return DEFAULT_CAPABILITIES[roleId || BLOCKED_ROLE_ID] - }, - } - - return { - capabilities, - projectKey, - encryptionKeys, - rpc, - } -} - -/** - * @param {Object} opts - * @param {string} [opts.deviceId] - * @param {string} opts.name - * @returns {import('@mapeo/schema').DeviceInfo} - */ -function createDeviceInfoRecord({ deviceId, name }) { - const docId = deviceId || randomBytes(32).toString('hex') - const createdBy = randomBytes(32).toString('hex') - - return { - schemaName: 'deviceInfo', - docId, - name, - versionId: `${docId}/0`, - createdAt: new Date().toISOString(), - updatedAt: new Date().toISOString(), - createdBy, - links: [], - } -} - -/** - * @param {Object} opts - * @param {string} [opts.projectId] - * @param {string} opts.name - * @returns {import('@mapeo/schema').ProjectSettings} - */ -function createProjectRecord({ projectId, name }) { - const docId = projectId || randomBytes(32).toString('hex') - const createdBy = randomBytes(32).toString('hex') - - return { - schemaName: 'projectSettings', - docId, - name, - versionId: `${docId}/0`, - createdAt: new Date().toISOString(), - updatedAt: new Date().toISOString(), - createdBy, - links: [], - } -} From 0ec841d1a6411fd7b5e585634f5d026449d7fe0e Mon Sep 17 00:00:00 2001 From: Andrew Chou Date: Wed, 11 Oct 2023 11:32:32 -0400 Subject: [PATCH 6/8] chore: update @mapeo/crypto to latest (#336) Co-authored-by: Gregor MacLennan --- package-lock.json | 56 ++++++++--------------------------------- package.json | 2 +- src/discovery/mdns.js | 2 +- src/utils.js | 2 +- tests/discovery/mdns.js | 2 +- 5 files changed, 15 insertions(+), 49 deletions(-) diff --git a/package-lock.json b/package-lock.json index a7630653a..aec401d95 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,7 +13,7 @@ "@digidem/types": "^2.1.0", "@fastify/type-provider-typebox": "^3.3.0", "@hyperswarm/secret-stream": "^6.1.2", - "@mapeo/crypto": "^1.0.0-alpha.8", + "@mapeo/crypto": "^1.0.0-alpha.10", "@mapeo/schema": "^3.0.0-next.11", "@mapeo/sqlite-indexer": "^1.0.0-alpha.6", "@sinclair/typebox": "^0.29.6", @@ -806,9 +806,9 @@ "integrity": "sha512-Hcv+nVC0kZnQ3tD9GVu5xSMR4VVYOteQIr/hwFPVEvPdlXqgGEuRjiheChHgdM+JyqdgNcmzZOX/tnl0JOiI7A==" }, "node_modules/@mapeo/crypto": { - "version": "1.0.0-alpha.8", - "resolved": "https://registry.npmjs.org/@mapeo/crypto/-/crypto-1.0.0-alpha.8.tgz", - "integrity": "sha512-2pIykZTFWINwtdsk2Fl3+3UdpQZrt1/IwMgvglwZlxyqKM7LpcuAda871BFiK1CcEsDlaTT63FBtIszEXnnawg==", + "version": "1.0.0-alpha.10", + "resolved": "https://registry.npmjs.org/@mapeo/crypto/-/crypto-1.0.0-alpha.10.tgz", + "integrity": "sha512-TEK8HN1W0XZOOADIMxa4saXtqAZKyBDeVVn3RBCcPaCiOGHeYy43/0rMnBVTbXZCLsLVPnOXwv6vg+vUkasrWQ==", "dependencies": { "@types/b4a": "^1.6.0", "b4a": "^1.6.4", @@ -818,28 +818,11 @@ "compact-encoding-net": "^1.0.1", "compact-encoding-struct": "^1.2.0", "crc": "^3.8.0", - "derive-key": "^1.0.1", "lodash": "^4.17.21", - "sodium-universal": "^3.0.4", + "sodium-universal": "^4.0.0", "z32": "^1.0.0" } }, - "node_modules/@mapeo/crypto/node_modules/sodium-universal": { - "version": "3.1.0", - "license": "MIT", - "dependencies": { - "blake2b": "^2.1.1", - "chacha20-universal": "^1.0.4", - "nanoassert": "^2.0.0", - "resolve": "^1.17.0", - "sha256-universal": "^1.1.0", - "sha512-universal": "^1.1.0", - "siphash24": "^1.0.1", - "sodium-javascript": "~0.8.0", - "sodium-native": "^3.2.0", - "xsalsa20": "^1.0.0" - } - }, "node_modules/@mapeo/schema": { "version": "3.0.0-next.11", "resolved": "https://registry.npmjs.org/@mapeo/schema/-/schema-3.0.0-next.11.tgz", @@ -1583,14 +1566,6 @@ "nanoassert": "^2.0.0" } }, - "node_modules/blake2b-universal": { - "version": "1.0.1", - "license": "MIT", - "dependencies": { - "blake2b": "^2.1.3", - "sodium-native": "^3.0.1" - } - }, "node_modules/blake2b-wasm": { "version": "2.4.0", "license": "MIT", @@ -2443,13 +2418,6 @@ "dev": true, "license": "MIT" }, - "node_modules/derive-key": { - "version": "1.0.1", - "license": "MIT", - "dependencies": { - "blake2b-universal": "^1.0.0" - } - }, "node_modules/detect-libc": { "version": "2.0.1", "license": "Apache-2.0", @@ -3495,6 +3463,7 @@ }, "node_modules/function-bind": { "version": "1.1.1", + "dev": true, "license": "MIT" }, "node_modules/function.prototype.name": { @@ -3749,6 +3718,7 @@ }, "node_modules/has": { "version": "1.0.3", + "dev": true, "license": "MIT", "dependencies": { "function-bind": "^1.1.1" @@ -4153,6 +4123,7 @@ }, "node_modules/is-core-module": { "version": "2.11.0", + "dev": true, "license": "MIT", "dependencies": { "has": "^1.0.3" @@ -6022,6 +5993,7 @@ }, "node_modules/path-parse": { "version": "1.0.7", + "dev": true, "license": "MIT" }, "node_modules/path-scurry": { @@ -6738,6 +6710,7 @@ }, "node_modules/resolve": { "version": "1.22.1", + "dev": true, "license": "MIT", "dependencies": { "is-core-module": "^2.9.0", @@ -7307,14 +7280,6 @@ "xsalsa20": "^1.0.0" } }, - "node_modules/sodium-native": { - "version": "3.4.1", - "hasInstallScript": true, - "license": "MIT", - "dependencies": { - "node-gyp-build": "^4.3.0" - } - }, "node_modules/sodium-secretstream": { "version": "1.1.0", "license": "MIT", @@ -7630,6 +7595,7 @@ }, "node_modules/supports-preserve-symlinks-flag": { "version": "1.0.0", + "dev": true, "license": "MIT", "engines": { "node": ">= 0.4" diff --git a/package.json b/package.json index 6028cd339..d090b1cb5 100644 --- a/package.json +++ b/package.json @@ -111,7 +111,7 @@ "@digidem/types": "^2.1.0", "@fastify/type-provider-typebox": "^3.3.0", "@hyperswarm/secret-stream": "^6.1.2", - "@mapeo/crypto": "^1.0.0-alpha.8", + "@mapeo/crypto": "^1.0.0-alpha.10", "@mapeo/schema": "^3.0.0-next.11", "@mapeo/sqlite-indexer": "^1.0.0-alpha.6", "@sinclair/typebox": "^0.29.6", diff --git a/src/discovery/mdns.js b/src/discovery/mdns.js index 072de699b..6d0810afb 100644 --- a/src/discovery/mdns.js +++ b/src/discovery/mdns.js @@ -7,7 +7,7 @@ import debug from 'debug' import { isPrivate } from 'bogon' import StartStopStateMachine from 'start-stop-state-machine' import pTimeout from 'p-timeout' -import { projectKeyToPublicId as keyToPublicId } from '@mapeo/crypto' +import { keyToPublicId } from '@mapeo/crypto' /** @typedef {{ publicKey: Buffer, secretKey: Buffer }} Keypair */ diff --git a/src/utils.js b/src/utils.js index fe431e71d..92c0568f6 100644 --- a/src/utils.js +++ b/src/utils.js @@ -1,5 +1,5 @@ import b4a from 'b4a' -import { projectKeyToPublicId as keyToPublicId } from '@mapeo/crypto' +import { keyToPublicId } from '@mapeo/crypto' /** * @param {String|Buffer} id diff --git a/tests/discovery/mdns.js b/tests/discovery/mdns.js index f253b39e2..b2cdf3714 100644 --- a/tests/discovery/mdns.js +++ b/tests/discovery/mdns.js @@ -3,7 +3,7 @@ import { randomBytes } from 'node:crypto' import net from 'node:net' import { KeyManager } from '@mapeo/crypto' import { setTimeout as delay } from 'node:timers/promises' -import { projectKeyToPublicId as keyToPublicId } from '@mapeo/crypto' +import { keyToPublicId } from '@mapeo/crypto' import { ERR_DUPLICATE, MdnsDiscovery } from '../../src/discovery/mdns.js' import NoiseSecretStream from '@hyperswarm/secret-stream' From d7e830972b8332b52f2eb9bb9b6079aaf9ba6fc9 Mon Sep 17 00:00:00 2001 From: Andrew Chou Date: Thu, 12 Oct 2023 13:52:04 -0400 Subject: [PATCH 7/8] chore: update better-sqlite3 to 8.7.0 (#337) --- package-lock.json | 9 +++++---- package.json | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/package-lock.json b/package-lock.json index aec401d95..6c95660d7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,7 +19,7 @@ "@sinclair/typebox": "^0.29.6", "b4a": "^1.6.3", "base32.js": "^0.1.0", - "better-sqlite3": "^8.3.0", + "better-sqlite3": "^8.7.0", "big-sparse-array": "^1.0.3", "bogon": "^1.1.0", "bonjour-service": "^1.1.1", @@ -1503,12 +1503,13 @@ "license": "MIT" }, "node_modules/better-sqlite3": { - "version": "8.5.0", + "version": "8.7.0", + "resolved": "https://registry.npmjs.org/better-sqlite3/-/better-sqlite3-8.7.0.tgz", + "integrity": "sha512-99jZU4le+f3G6aIl6PmmV0cxUIWqKieHxsiF7G34CVFiE+/UabpYqkU0NJIkY/96mQKikHeBjtR27vFfs5JpEw==", "hasInstallScript": true, - "license": "MIT", "dependencies": { "bindings": "^1.5.0", - "prebuild-install": "^7.1.0" + "prebuild-install": "^7.1.1" } }, "node_modules/big-sparse-array": { diff --git a/package.json b/package.json index d090b1cb5..67fbba205 100644 --- a/package.json +++ b/package.json @@ -117,7 +117,7 @@ "@sinclair/typebox": "^0.29.6", "b4a": "^1.6.3", "base32.js": "^0.1.0", - "better-sqlite3": "^8.3.0", + "better-sqlite3": "^8.7.0", "big-sparse-array": "^1.0.3", "bogon": "^1.1.0", "bonjour-service": "^1.1.1", From 110d6d8086c3d2e3cb052482420fa8f8256946db Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Mon, 16 Oct 2023 18:29:51 +0100 Subject: [PATCH 8/8] feat: NamespaceSyncState improvements (#339) --- src/sync/core-sync-state.js | 12 ++--- src/sync/namespace-sync-state.js | 79 +++++++++++++++++++++++------- tests/helpers/core-manager.js | 11 +++-- tests/helpers/replication-state.js | 13 ----- tests/sync/namespace-sync-state.js | 60 +++++++++++++++++------ 5 files changed, 120 insertions(+), 55 deletions(-) diff --git a/src/sync/core-sync-state.js b/src/sync/core-sync-state.js index 9ca1df587..ea7f57167 100644 --- a/src/sync/core-sync-state.js +++ b/src/sync/core-sync-state.js @@ -16,20 +16,20 @@ import RemoteBitfield, { * @property {Map} remoteStates */ /** - * @typedef {object} PeerSimpleState + * @typedef {object} CoreState * @property {number} have blocks the peer has locally * @property {number} want blocks the peer wants, and at least one peer has * @property {number} wanted blocks the peer has that at least one peer wants * @property {number} missing blocks the peer wants but no peer has */ /** - * @typedef {PeerSimpleState & { connected: boolean }} RemotePeerSimpleState + * @typedef {CoreState & { connected: boolean }} PeerCoreState */ /** * @typedef {object} DerivedState * @property {number} coreLength known (sparse) length of the core - * @property {PeerSimpleState} localState local state - * @property {Record} remoteStates map of state of all known peers + * @property {CoreState} localState local state + * @property {Record} remoteStates map of state of all known peers */ /** @@ -316,7 +316,7 @@ export function deriveState(coreState) { const peerIds = ['local', ...coreState.remoteStates.keys()] const peers = [coreState.localState, ...coreState.remoteStates.values()] - /** @type {PeerSimpleState[]} */ + /** @type {CoreState[]} */ const peerStates = new Array(peers.length) const length = coreState.length || 0 for (let i = 0; i < peerStates.length; i++) { @@ -366,7 +366,7 @@ export function deriveState(coreState) { remoteStates: {}, } for (let j = 1; j < peerStates.length; j++) { - const peerState = /** @type {RemotePeerSimpleState} */ (peerStates[j]) + const peerState = /** @type {PeerCoreState} */ (peerStates[j]) peerState.connected = peers[j].connected derivedState.remoteStates[peerIds[j]] = peerState } diff --git a/src/sync/namespace-sync-state.js b/src/sync/namespace-sync-state.js index 169c42303..5ddb122f6 100644 --- a/src/sync/namespace-sync-state.js +++ b/src/sync/namespace-sync-state.js @@ -2,16 +2,7 @@ import { CoreSyncState } from './core-sync-state.js' import { discoveryKey } from 'hypercore-crypto' /** - * @typedef {object} PeerSyncState - * @property {number} have - * @property {number} want - * @property {number} wanted - * @property {number} missing - */ - -/** - * @typedef {object} SyncState - * @property {PeerSyncState} localState + * @typedef {Omit} SyncState */ /** @@ -22,6 +13,8 @@ export class NamespaceSyncState { #coreStates = new Map() #handleUpdate #namespace + /** @type {SyncState | null} */ + #cachedState = null /** * @param {object} opts @@ -31,7 +24,12 @@ export class NamespaceSyncState { */ constructor({ namespace, coreManager, onUpdate }) { this.#namespace = namespace - this.#handleUpdate = onUpdate + // Called whenever the state changes, so we clear the cache because next + // call to getState() will need to re-derive the state + this.#handleUpdate = () => { + this.#cachedState = null + process.nextTick(onUpdate) + } for (const { core, key } of coreManager.getCores(namespace)) { this.#addCore(core, key) @@ -54,16 +52,25 @@ export class NamespaceSyncState { /** @returns {SyncState} */ getState() { + if (this.#cachedState) return this.#cachedState + /** @type {SyncState} */ const state = { - localState: { have: 0, want: 0, wanted: 0, missing: 0 }, + localState: { want: 0, have: 0, wanted: 0, missing: 0 }, + remoteStates: {}, } - for (const crs of this.#coreStates.values()) { - const { localState } = crs.getState() - state.localState.have += localState.have - state.localState.want += localState.want - state.localState.wanted += localState.wanted - state.localState.missing += localState.missing + for (const css of this.#coreStates.values()) { + const coreState = css.getState() + mutatingAddPeerState(state.localState, coreState.localState) + for (const [peerId, peerCoreState] of Object.entries( + coreState.remoteStates + )) { + if (!(peerId in state.remoteStates)) { + state.remoteStates[peerId] = createPeerState(peerCoreState.connected) + } + mutatingAddPeerState(state.remoteStates[peerId], peerCoreState) + } } + this.#cachedState = state return state } @@ -100,3 +107,39 @@ export class NamespaceSyncState { return coreState } } + +/** @returns {SyncState['remoteStates'][string]} */ +function createPeerState(connected = false) { + return { want: 0, have: 0, wanted: 0, missing: 0, connected } +} + +/** + * @overload + * @param {SyncState['localState']} accumulator + * @param {SyncState['localState']} currentValue + * @returns {SyncState['localState']} + */ + +/** + * @overload + * @param {SyncState['remoteStates'][string]} accumulator + * @param {SyncState['remoteStates'][string]} currentValue + * @returns {SyncState['remoteStates'][string]} + */ + +/** + * Adds peer state in `currentValue` to peer state in `accumulator` + * + * @param {SyncState['remoteStates'][string]} accumulator + * @param {SyncState['remoteStates'][string]} currentValue + */ +function mutatingAddPeerState(accumulator, currentValue) { + accumulator.have += currentValue.have + accumulator.want += currentValue.want + accumulator.wanted += currentValue.wanted + accumulator.missing += currentValue.missing + if ('connected' in accumulator) { + accumulator.connected = accumulator.connected && currentValue.connected + } + return accumulator +} diff --git a/tests/helpers/core-manager.js b/tests/helpers/core-manager.js index 0269c6688..d7b2a5261 100644 --- a/tests/helpers/core-manager.js +++ b/tests/helpers/core-manager.js @@ -31,11 +31,16 @@ export function createCoreManager({ * * @param {CoreManager} cm1 * @param {CoreManager} cm2 + * @param {{ kp1: import('../../src/types.js').KeyPair, kp2: import('../../src/types.js').KeyPair }} [opts] * @returns */ -export function replicate(cm1, cm2) { - const n1 = new NoiseSecretStream(true) - const n2 = new NoiseSecretStream(false) +export function replicate( + cm1, + cm2, + { kp1 = NoiseSecretStream.keyPair(), kp2 = NoiseSecretStream.keyPair() } = {} +) { + const n1 = new NoiseSecretStream(true, undefined, { keyPair: kp1 }) + const n2 = new NoiseSecretStream(false, undefined, { keyPair: kp2 }) n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) const rsm1 = cm1.replicate(n1) diff --git a/tests/helpers/replication-state.js b/tests/helpers/replication-state.js index 19f1e430f..f74c188e4 100644 --- a/tests/helpers/replication-state.js +++ b/tests/helpers/replication-state.js @@ -1,5 +1,3 @@ -import NoiseSecretStream from '@hyperswarm/secret-stream' - import { truncateId } from '../../src/utils.js' export function logState(syncState, name) { @@ -58,14 +56,3 @@ export async function downloadCore( const core = coreManager.getCoreByKey(key) await core.download({ start, end, ifAvailable: true }).done() } - -export function replicate(cm1, cm2) { - const n1 = new NoiseSecretStream(true) - const n2 = new NoiseSecretStream(false) - n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) - - cm1.replicate(n1) - cm2.replicate(n2) - - return { syncStream1: n1, syncStream2: n2 } -} diff --git a/tests/sync/namespace-sync-state.js b/tests/sync/namespace-sync-state.js index 091f33fe6..e14a34a53 100644 --- a/tests/sync/namespace-sync-state.js +++ b/tests/sync/namespace-sync-state.js @@ -6,20 +6,32 @@ import { createCoreManager, waitForCores, getKeys, + replicate, } from '../helpers/core-manager.js' -import { replicate } from '../helpers/replication-state.js' +import { randomBytes } from 'crypto' test('sync cores in a namespace', async function (t) { t.plan(2) const projectKeyPair = KeyManager.generateProjectKeypair() + const rootKey1 = randomBytes(16) + const rootKey2 = randomBytes(16) + const km1 = new KeyManager(rootKey1) + const km2 = new KeyManager(rootKey2) const cm1 = createCoreManager({ + rootKey: rootKey1, projectKey: projectKeyPair.publicKey, projectSecretKey: projectKeyPair.secretKey, }) - const cm2 = createCoreManager({ projectKey: projectKeyPair.publicKey }) + const cm2 = createCoreManager({ + rootKey: rootKey2, + projectKey: projectKeyPair.publicKey, + }) - replicate(cm1, cm2) + replicate(cm1, cm2, { + kp1: km1.getIdentityKeypair(), + kp2: km2.getIdentityKeypair(), + }) await Promise.all([ waitForCores(cm1, getKeys(cm2, 'auth')), @@ -33,15 +45,24 @@ test('sync cores in a namespace', async function (t) { coreManager: cm1, namespace: 'auth', onUpdate: () => { - const { localState } = syncState1.getState() + const state = syncState1.getState() if ( - localState.want === 0 && - localState.wanted === 0 && - localState.have === 30 && - localState.missing === 10 && + state.localState.want === 0 && + state.localState.wanted === 0 && + state.localState.have === 30 && + state.localState.missing === 10 && !syncState1Synced ) { - t.pass('syncState1 is synced') + const expected = { + [km2.getIdentityKeypair().publicKey.toString('hex')]: { + want: 0, + wanted: 0, + have: 30, + missing: 10, + connected: true, + }, + } + t.alike(state.remoteStates, expected, 'syncState1 is synced') syncState1Synced = true } }, @@ -51,15 +72,24 @@ test('sync cores in a namespace', async function (t) { coreManager: cm2, namespace: 'auth', onUpdate: () => { - const { localState } = syncState2.getState() + const state = syncState2.getState() if ( - localState.want === 0 && - localState.wanted === 0 && - localState.have === 30 && - localState.missing === 10 && + state.localState.want === 0 && + state.localState.wanted === 0 && + state.localState.have === 30 && + state.localState.missing === 10 && !syncState2Synced ) { - t.pass('syncState2 is synced') + const expected = { + [km1.getIdentityKeypair().publicKey.toString('hex')]: { + want: 0, + wanted: 0, + have: 30, + missing: 10, + connected: true, + }, + } + t.alike(state.remoteStates, expected, 'syncState2 is synced') syncState2Synced = true } },