Skip to content

Commit

Permalink
feat: NamespaceSyncState improvements (#339)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaclennan authored Oct 16, 2023
1 parent d7e8309 commit 110d6d8
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 55 deletions.
12 changes: 6 additions & 6 deletions src/sync/core-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@ import RemoteBitfield, {
* @property {Map<PeerId, PeerState>} remoteStates
*/
/**
* @typedef {object} PeerSimpleState
* @typedef {object} CoreState
* @property {number} have blocks the peer has locally
* @property {number} want blocks the peer wants, and at least one peer has
* @property {number} wanted blocks the peer has that at least one peer wants
* @property {number} missing blocks the peer wants but no peer has
*/
/**
* @typedef {PeerSimpleState & { connected: boolean }} RemotePeerSimpleState
* @typedef {CoreState & { connected: boolean }} PeerCoreState
*/
/**
* @typedef {object} DerivedState
* @property {number} coreLength known (sparse) length of the core
* @property {PeerSimpleState} localState local state
* @property {Record<PeerId, RemotePeerSimpleState>} remoteStates map of state of all known peers
* @property {CoreState} localState local state
* @property {Record<PeerId, PeerCoreState>} remoteStates map of state of all known peers
*/

/**
Expand Down Expand Up @@ -316,7 +316,7 @@ export function deriveState(coreState) {
const peerIds = ['local', ...coreState.remoteStates.keys()]
const peers = [coreState.localState, ...coreState.remoteStates.values()]

/** @type {PeerSimpleState[]} */
/** @type {CoreState[]} */
const peerStates = new Array(peers.length)
const length = coreState.length || 0
for (let i = 0; i < peerStates.length; i++) {
Expand Down Expand Up @@ -366,7 +366,7 @@ export function deriveState(coreState) {
remoteStates: {},
}
for (let j = 1; j < peerStates.length; j++) {
const peerState = /** @type {RemotePeerSimpleState} */ (peerStates[j])
const peerState = /** @type {PeerCoreState} */ (peerStates[j])
peerState.connected = peers[j].connected
derivedState.remoteStates[peerIds[j]] = peerState
}
Expand Down
79 changes: 61 additions & 18 deletions src/sync/namespace-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,7 @@ import { CoreSyncState } from './core-sync-state.js'
import { discoveryKey } from 'hypercore-crypto'

/**
* @typedef {object} PeerSyncState
* @property {number} have
* @property {number} want
* @property {number} wanted
* @property {number} missing
*/

/**
* @typedef {object} SyncState
* @property {PeerSyncState} localState
* @typedef {Omit<import('./core-sync-state.js').DerivedState, 'coreLength'>} SyncState
*/

/**
Expand All @@ -22,6 +13,8 @@ export class NamespaceSyncState {
#coreStates = new Map()
#handleUpdate
#namespace
/** @type {SyncState | null} */
#cachedState = null

/**
* @param {object} opts
Expand All @@ -31,7 +24,12 @@ export class NamespaceSyncState {
*/
constructor({ namespace, coreManager, onUpdate }) {
this.#namespace = namespace
this.#handleUpdate = onUpdate
// Called whenever the state changes, so we clear the cache because next
// call to getState() will need to re-derive the state
this.#handleUpdate = () => {
this.#cachedState = null
process.nextTick(onUpdate)
}

for (const { core, key } of coreManager.getCores(namespace)) {
this.#addCore(core, key)
Expand All @@ -54,16 +52,25 @@ export class NamespaceSyncState {

/** @returns {SyncState} */
getState() {
if (this.#cachedState) return this.#cachedState
/** @type {SyncState} */
const state = {
localState: { have: 0, want: 0, wanted: 0, missing: 0 },
localState: { want: 0, have: 0, wanted: 0, missing: 0 },
remoteStates: {},
}
for (const crs of this.#coreStates.values()) {
const { localState } = crs.getState()
state.localState.have += localState.have
state.localState.want += localState.want
state.localState.wanted += localState.wanted
state.localState.missing += localState.missing
for (const css of this.#coreStates.values()) {
const coreState = css.getState()
mutatingAddPeerState(state.localState, coreState.localState)
for (const [peerId, peerCoreState] of Object.entries(
coreState.remoteStates
)) {
if (!(peerId in state.remoteStates)) {
state.remoteStates[peerId] = createPeerState(peerCoreState.connected)
}
mutatingAddPeerState(state.remoteStates[peerId], peerCoreState)
}
}
this.#cachedState = state
return state
}

Expand Down Expand Up @@ -100,3 +107,39 @@ export class NamespaceSyncState {
return coreState
}
}

/** @returns {SyncState['remoteStates'][string]} */
function createPeerState(connected = false) {
return { want: 0, have: 0, wanted: 0, missing: 0, connected }
}

/**
* @overload
* @param {SyncState['localState']} accumulator
* @param {SyncState['localState']} currentValue
* @returns {SyncState['localState']}
*/

/**
* @overload
* @param {SyncState['remoteStates'][string]} accumulator
* @param {SyncState['remoteStates'][string]} currentValue
* @returns {SyncState['remoteStates'][string]}
*/

/**
* Adds peer state in `currentValue` to peer state in `accumulator`
*
* @param {SyncState['remoteStates'][string]} accumulator
* @param {SyncState['remoteStates'][string]} currentValue
*/
function mutatingAddPeerState(accumulator, currentValue) {
accumulator.have += currentValue.have
accumulator.want += currentValue.want
accumulator.wanted += currentValue.wanted
accumulator.missing += currentValue.missing
if ('connected' in accumulator) {
accumulator.connected = accumulator.connected && currentValue.connected
}
return accumulator
}
11 changes: 8 additions & 3 deletions tests/helpers/core-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@ export function createCoreManager({
*
* @param {CoreManager} cm1
* @param {CoreManager} cm2
* @param {{ kp1: import('../../src/types.js').KeyPair, kp2: import('../../src/types.js').KeyPair }} [opts]
* @returns
*/
export function replicate(cm1, cm2) {
const n1 = new NoiseSecretStream(true)
const n2 = new NoiseSecretStream(false)
export function replicate(
cm1,
cm2,
{ kp1 = NoiseSecretStream.keyPair(), kp2 = NoiseSecretStream.keyPair() } = {}
) {
const n1 = new NoiseSecretStream(true, undefined, { keyPair: kp1 })
const n2 = new NoiseSecretStream(false, undefined, { keyPair: kp2 })
n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream)

const rsm1 = cm1.replicate(n1)
Expand Down
13 changes: 0 additions & 13 deletions tests/helpers/replication-state.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import NoiseSecretStream from '@hyperswarm/secret-stream'

import { truncateId } from '../../src/utils.js'

export function logState(syncState, name) {
Expand Down Expand Up @@ -58,14 +56,3 @@ export async function downloadCore(
const core = coreManager.getCoreByKey(key)
await core.download({ start, end, ifAvailable: true }).done()
}

export function replicate(cm1, cm2) {
const n1 = new NoiseSecretStream(true)
const n2 = new NoiseSecretStream(false)
n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream)

cm1.replicate(n1)
cm2.replicate(n2)

return { syncStream1: n1, syncStream2: n2 }
}
60 changes: 45 additions & 15 deletions tests/sync/namespace-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,32 @@ import {
createCoreManager,
waitForCores,
getKeys,
replicate,
} from '../helpers/core-manager.js'
import { replicate } from '../helpers/replication-state.js'
import { randomBytes } from 'crypto'

test('sync cores in a namespace', async function (t) {
t.plan(2)
const projectKeyPair = KeyManager.generateProjectKeypair()
const rootKey1 = randomBytes(16)
const rootKey2 = randomBytes(16)
const km1 = new KeyManager(rootKey1)
const km2 = new KeyManager(rootKey2)

const cm1 = createCoreManager({
rootKey: rootKey1,
projectKey: projectKeyPair.publicKey,
projectSecretKey: projectKeyPair.secretKey,
})
const cm2 = createCoreManager({ projectKey: projectKeyPair.publicKey })
const cm2 = createCoreManager({
rootKey: rootKey2,
projectKey: projectKeyPair.publicKey,
})

replicate(cm1, cm2)
replicate(cm1, cm2, {
kp1: km1.getIdentityKeypair(),
kp2: km2.getIdentityKeypair(),
})

await Promise.all([
waitForCores(cm1, getKeys(cm2, 'auth')),
Expand All @@ -33,15 +45,24 @@ test('sync cores in a namespace', async function (t) {
coreManager: cm1,
namespace: 'auth',
onUpdate: () => {
const { localState } = syncState1.getState()
const state = syncState1.getState()
if (
localState.want === 0 &&
localState.wanted === 0 &&
localState.have === 30 &&
localState.missing === 10 &&
state.localState.want === 0 &&
state.localState.wanted === 0 &&
state.localState.have === 30 &&
state.localState.missing === 10 &&
!syncState1Synced
) {
t.pass('syncState1 is synced')
const expected = {
[km2.getIdentityKeypair().publicKey.toString('hex')]: {
want: 0,
wanted: 0,
have: 30,
missing: 10,
connected: true,
},
}
t.alike(state.remoteStates, expected, 'syncState1 is synced')
syncState1Synced = true
}
},
Expand All @@ -51,15 +72,24 @@ test('sync cores in a namespace', async function (t) {
coreManager: cm2,
namespace: 'auth',
onUpdate: () => {
const { localState } = syncState2.getState()
const state = syncState2.getState()
if (
localState.want === 0 &&
localState.wanted === 0 &&
localState.have === 30 &&
localState.missing === 10 &&
state.localState.want === 0 &&
state.localState.wanted === 0 &&
state.localState.have === 30 &&
state.localState.missing === 10 &&
!syncState2Synced
) {
t.pass('syncState2 is synced')
const expected = {
[km1.getIdentityKeypair().publicKey.toString('hex')]: {
want: 0,
wanted: 0,
have: 30,
missing: 10,
connected: true,
},
}
t.alike(state.remoteStates, expected, 'syncState2 is synced')
syncState2Synced = true
}
},
Expand Down

0 comments on commit 110d6d8

Please sign in to comment.