From 110d6d8086c3d2e3cb052482420fa8f8256946db Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Mon, 16 Oct 2023 18:29:51 +0100 Subject: [PATCH] feat: NamespaceSyncState improvements (#339) --- src/sync/core-sync-state.js | 12 ++--- src/sync/namespace-sync-state.js | 79 +++++++++++++++++++++++------- tests/helpers/core-manager.js | 11 +++-- tests/helpers/replication-state.js | 13 ----- tests/sync/namespace-sync-state.js | 60 +++++++++++++++++------ 5 files changed, 120 insertions(+), 55 deletions(-) diff --git a/src/sync/core-sync-state.js b/src/sync/core-sync-state.js index 9ca1df587..ea7f57167 100644 --- a/src/sync/core-sync-state.js +++ b/src/sync/core-sync-state.js @@ -16,20 +16,20 @@ import RemoteBitfield, { * @property {Map} remoteStates */ /** - * @typedef {object} PeerSimpleState + * @typedef {object} CoreState * @property {number} have blocks the peer has locally * @property {number} want blocks the peer wants, and at least one peer has * @property {number} wanted blocks the peer has that at least one peer wants * @property {number} missing blocks the peer wants but no peer has */ /** - * @typedef {PeerSimpleState & { connected: boolean }} RemotePeerSimpleState + * @typedef {CoreState & { connected: boolean }} PeerCoreState */ /** * @typedef {object} DerivedState * @property {number} coreLength known (sparse) length of the core - * @property {PeerSimpleState} localState local state - * @property {Record} remoteStates map of state of all known peers + * @property {CoreState} localState local state + * @property {Record} remoteStates map of state of all known peers */ /** @@ -316,7 +316,7 @@ export function deriveState(coreState) { const peerIds = ['local', ...coreState.remoteStates.keys()] const peers = [coreState.localState, ...coreState.remoteStates.values()] - /** @type {PeerSimpleState[]} */ + /** @type {CoreState[]} */ const peerStates = new Array(peers.length) const length = coreState.length || 0 for (let i = 0; i < peerStates.length; i++) { @@ -366,7 +366,7 @@ export function deriveState(coreState) { remoteStates: {}, } for (let j = 1; j < peerStates.length; j++) { - const peerState = /** @type {RemotePeerSimpleState} */ (peerStates[j]) + const peerState = /** @type {PeerCoreState} */ (peerStates[j]) peerState.connected = peers[j].connected derivedState.remoteStates[peerIds[j]] = peerState } diff --git a/src/sync/namespace-sync-state.js b/src/sync/namespace-sync-state.js index 169c42303..5ddb122f6 100644 --- a/src/sync/namespace-sync-state.js +++ b/src/sync/namespace-sync-state.js @@ -2,16 +2,7 @@ import { CoreSyncState } from './core-sync-state.js' import { discoveryKey } from 'hypercore-crypto' /** - * @typedef {object} PeerSyncState - * @property {number} have - * @property {number} want - * @property {number} wanted - * @property {number} missing - */ - -/** - * @typedef {object} SyncState - * @property {PeerSyncState} localState + * @typedef {Omit} SyncState */ /** @@ -22,6 +13,8 @@ export class NamespaceSyncState { #coreStates = new Map() #handleUpdate #namespace + /** @type {SyncState | null} */ + #cachedState = null /** * @param {object} opts @@ -31,7 +24,12 @@ export class NamespaceSyncState { */ constructor({ namespace, coreManager, onUpdate }) { this.#namespace = namespace - this.#handleUpdate = onUpdate + // Called whenever the state changes, so we clear the cache because next + // call to getState() will need to re-derive the state + this.#handleUpdate = () => { + this.#cachedState = null + process.nextTick(onUpdate) + } for (const { core, key } of coreManager.getCores(namespace)) { this.#addCore(core, key) @@ -54,16 +52,25 @@ export class NamespaceSyncState { /** @returns {SyncState} */ getState() { + if (this.#cachedState) return this.#cachedState + /** @type {SyncState} */ const state = { - localState: { have: 0, want: 0, wanted: 0, missing: 0 }, + localState: { want: 0, have: 0, wanted: 0, missing: 0 }, + remoteStates: {}, } - for (const crs of this.#coreStates.values()) { - const { localState } = crs.getState() - state.localState.have += localState.have - state.localState.want += localState.want - state.localState.wanted += localState.wanted - state.localState.missing += localState.missing + for (const css of this.#coreStates.values()) { + const coreState = css.getState() + mutatingAddPeerState(state.localState, coreState.localState) + for (const [peerId, peerCoreState] of Object.entries( + coreState.remoteStates + )) { + if (!(peerId in state.remoteStates)) { + state.remoteStates[peerId] = createPeerState(peerCoreState.connected) + } + mutatingAddPeerState(state.remoteStates[peerId], peerCoreState) + } } + this.#cachedState = state return state } @@ -100,3 +107,39 @@ export class NamespaceSyncState { return coreState } } + +/** @returns {SyncState['remoteStates'][string]} */ +function createPeerState(connected = false) { + return { want: 0, have: 0, wanted: 0, missing: 0, connected } +} + +/** + * @overload + * @param {SyncState['localState']} accumulator + * @param {SyncState['localState']} currentValue + * @returns {SyncState['localState']} + */ + +/** + * @overload + * @param {SyncState['remoteStates'][string]} accumulator + * @param {SyncState['remoteStates'][string]} currentValue + * @returns {SyncState['remoteStates'][string]} + */ + +/** + * Adds peer state in `currentValue` to peer state in `accumulator` + * + * @param {SyncState['remoteStates'][string]} accumulator + * @param {SyncState['remoteStates'][string]} currentValue + */ +function mutatingAddPeerState(accumulator, currentValue) { + accumulator.have += currentValue.have + accumulator.want += currentValue.want + accumulator.wanted += currentValue.wanted + accumulator.missing += currentValue.missing + if ('connected' in accumulator) { + accumulator.connected = accumulator.connected && currentValue.connected + } + return accumulator +} diff --git a/tests/helpers/core-manager.js b/tests/helpers/core-manager.js index 0269c6688..d7b2a5261 100644 --- a/tests/helpers/core-manager.js +++ b/tests/helpers/core-manager.js @@ -31,11 +31,16 @@ export function createCoreManager({ * * @param {CoreManager} cm1 * @param {CoreManager} cm2 + * @param {{ kp1: import('../../src/types.js').KeyPair, kp2: import('../../src/types.js').KeyPair }} [opts] * @returns */ -export function replicate(cm1, cm2) { - const n1 = new NoiseSecretStream(true) - const n2 = new NoiseSecretStream(false) +export function replicate( + cm1, + cm2, + { kp1 = NoiseSecretStream.keyPair(), kp2 = NoiseSecretStream.keyPair() } = {} +) { + const n1 = new NoiseSecretStream(true, undefined, { keyPair: kp1 }) + const n2 = new NoiseSecretStream(false, undefined, { keyPair: kp2 }) n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) const rsm1 = cm1.replicate(n1) diff --git a/tests/helpers/replication-state.js b/tests/helpers/replication-state.js index 19f1e430f..f74c188e4 100644 --- a/tests/helpers/replication-state.js +++ b/tests/helpers/replication-state.js @@ -1,5 +1,3 @@ -import NoiseSecretStream from '@hyperswarm/secret-stream' - import { truncateId } from '../../src/utils.js' export function logState(syncState, name) { @@ -58,14 +56,3 @@ export async function downloadCore( const core = coreManager.getCoreByKey(key) await core.download({ start, end, ifAvailable: true }).done() } - -export function replicate(cm1, cm2) { - const n1 = new NoiseSecretStream(true) - const n2 = new NoiseSecretStream(false) - n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) - - cm1.replicate(n1) - cm2.replicate(n2) - - return { syncStream1: n1, syncStream2: n2 } -} diff --git a/tests/sync/namespace-sync-state.js b/tests/sync/namespace-sync-state.js index 091f33fe6..e14a34a53 100644 --- a/tests/sync/namespace-sync-state.js +++ b/tests/sync/namespace-sync-state.js @@ -6,20 +6,32 @@ import { createCoreManager, waitForCores, getKeys, + replicate, } from '../helpers/core-manager.js' -import { replicate } from '../helpers/replication-state.js' +import { randomBytes } from 'crypto' test('sync cores in a namespace', async function (t) { t.plan(2) const projectKeyPair = KeyManager.generateProjectKeypair() + const rootKey1 = randomBytes(16) + const rootKey2 = randomBytes(16) + const km1 = new KeyManager(rootKey1) + const km2 = new KeyManager(rootKey2) const cm1 = createCoreManager({ + rootKey: rootKey1, projectKey: projectKeyPair.publicKey, projectSecretKey: projectKeyPair.secretKey, }) - const cm2 = createCoreManager({ projectKey: projectKeyPair.publicKey }) + const cm2 = createCoreManager({ + rootKey: rootKey2, + projectKey: projectKeyPair.publicKey, + }) - replicate(cm1, cm2) + replicate(cm1, cm2, { + kp1: km1.getIdentityKeypair(), + kp2: km2.getIdentityKeypair(), + }) await Promise.all([ waitForCores(cm1, getKeys(cm2, 'auth')), @@ -33,15 +45,24 @@ test('sync cores in a namespace', async function (t) { coreManager: cm1, namespace: 'auth', onUpdate: () => { - const { localState } = syncState1.getState() + const state = syncState1.getState() if ( - localState.want === 0 && - localState.wanted === 0 && - localState.have === 30 && - localState.missing === 10 && + state.localState.want === 0 && + state.localState.wanted === 0 && + state.localState.have === 30 && + state.localState.missing === 10 && !syncState1Synced ) { - t.pass('syncState1 is synced') + const expected = { + [km2.getIdentityKeypair().publicKey.toString('hex')]: { + want: 0, + wanted: 0, + have: 30, + missing: 10, + connected: true, + }, + } + t.alike(state.remoteStates, expected, 'syncState1 is synced') syncState1Synced = true } }, @@ -51,15 +72,24 @@ test('sync cores in a namespace', async function (t) { coreManager: cm2, namespace: 'auth', onUpdate: () => { - const { localState } = syncState2.getState() + const state = syncState2.getState() if ( - localState.want === 0 && - localState.wanted === 0 && - localState.have === 30 && - localState.missing === 10 && + state.localState.want === 0 && + state.localState.wanted === 0 && + state.localState.have === 30 && + state.localState.missing === 10 && !syncState2Synced ) { - t.pass('syncState2 is synced') + const expected = { + [km1.getIdentityKeypair().publicKey.toString('hex')]: { + want: 0, + wanted: 0, + have: 30, + missing: 10, + connected: true, + }, + } + t.alike(state.remoteStates, expected, 'syncState2 is synced') syncState2Synced = true } },