Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: NamespaceSyncState improvements #339

Merged
merged 4 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/sync/core-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@ import RemoteBitfield, {
* @property {Map<PeerId, PeerState>} remoteStates
*/
/**
* @typedef {object} PeerSimpleState
* @typedef {object} CoreState
* @property {number} have blocks the peer has locally
* @property {number} want blocks the peer wants, and at least one peer has
* @property {number} wanted blocks the peer has that at least one peer wants
* @property {number} missing blocks the peer wants but no peer has
*/
/**
* @typedef {PeerSimpleState & { connected: boolean }} RemotePeerSimpleState
* @typedef {CoreState & { connected: boolean }} PeerCoreState
*/
/**
* @typedef {object} DerivedState
* @property {number} coreLength known (sparse) length of the core
* @property {PeerSimpleState} localState local state
* @property {Record<PeerId, RemotePeerSimpleState>} remoteStates map of state of all known peers
* @property {CoreState} localState local state
* @property {Record<PeerId, PeerCoreState>} remoteStates map of state of all known peers
*/

/**
Expand Down Expand Up @@ -316,7 +316,7 @@ export function deriveState(coreState) {
const peerIds = ['local', ...coreState.remoteStates.keys()]
const peers = [coreState.localState, ...coreState.remoteStates.values()]

/** @type {PeerSimpleState[]} */
/** @type {CoreState[]} */
const peerStates = new Array(peers.length)
const length = coreState.length || 0
for (let i = 0; i < peerStates.length; i++) {
Expand Down Expand Up @@ -366,7 +366,7 @@ export function deriveState(coreState) {
remoteStates: {},
}
for (let j = 1; j < peerStates.length; j++) {
const peerState = /** @type {RemotePeerSimpleState} */ (peerStates[j])
const peerState = /** @type {PeerCoreState} */ (peerStates[j])
peerState.connected = peers[j].connected
derivedState.remoteStates[peerIds[j]] = peerState
}
Expand Down
78 changes: 60 additions & 18 deletions src/sync/namespace-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,7 @@ import { CoreSyncState } from './core-sync-state.js'
import { discoveryKey } from 'hypercore-crypto'

/**
* @typedef {object} PeerSyncState
* @property {number} have
* @property {number} want
* @property {number} wanted
* @property {number} missing
*/

/**
* @typedef {object} SyncState
* @property {PeerSyncState} localState
* @typedef {Omit<import('./core-sync-state.js').DerivedState, 'coreLength'>} SyncState
*/

/**
Expand All @@ -22,6 +13,8 @@ export class NamespaceSyncState {
#coreStates = new Map()
#handleUpdate
#namespace
/** @type {SyncState | null} */
#cachedState = null

/**
* @param {object} opts
Expand All @@ -31,7 +24,12 @@ export class NamespaceSyncState {
*/
constructor({ namespace, coreManager, onUpdate }) {
this.#namespace = namespace
this.#handleUpdate = onUpdate
// Called whenever the state changes, so we clear the cache because next
// call to getState() will need to re-derive the state
this.#handleUpdate = () => {
this.#cachedState = null
process.nextTick(onUpdate)
}

for (const { core, key } of coreManager.getCores(namespace)) {
this.#addCore(core, key)
Expand All @@ -54,15 +52,23 @@ export class NamespaceSyncState {

/** @returns {SyncState} */
getState() {
if (this.#cachedState) return this.#cachedState
/** @type {SyncState} */
const state = {
localState: { have: 0, want: 0, wanted: 0, missing: 0 },
localState: { want: 0, have: 0, wanted: 0, missing: 0 },
remoteStates: {},
}
for (const crs of this.#coreStates.values()) {
const { localState } = crs.getState()
state.localState.have += localState.have
state.localState.want += localState.want
state.localState.wanted += localState.wanted
state.localState.missing += localState.missing
for (const css of this.#coreStates.values()) {
const coreState = css.getState()
mutatingAddPeerState(state.localState, coreState.localState)
for (const [peerId, peerCoreState] of Object.entries(
coreState.remoteStates
)) {
if (!(peerId in state.remoteStates)) {
state.remoteStates[peerId] = createPeerState(peerCoreState.connected)
}
mutatingAddPeerState(state.remoteStates[peerId], peerCoreState)
}
}
return state
}
gmaclennan marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -100,3 +106,39 @@ export class NamespaceSyncState {
return coreState
}
}

/** @returns {SyncState['remoteStates'][string]} */
function createPeerState(connected = false) {
return { want: 0, have: 0, wanted: 0, missing: 0, connected }
}

/**
* @overload
* @param {SyncState['localState']} accumulator
* @param {SyncState['localState']} currentValue
* @returns {SyncState['localState']}
*/

/**
* @overload
* @param {SyncState['remoteStates'][string]} accumulator
* @param {SyncState['remoteStates'][string]} currentValue
* @returns {SyncState['remoteStates'][string]}
*/

/**
* Adds peer state in `currentValue` to peer state in `accumulator`
*
* @param {SyncState['remoteStates'][string]} accumulator
* @param {SyncState['remoteStates'][string]} currentValue
*/
function mutatingAddPeerState(accumulator, currentValue) {
accumulator.have += currentValue.have
accumulator.want += currentValue.want
accumulator.wanted += currentValue.wanted
accumulator.missing += currentValue.missing
if ('connected' in accumulator) {
accumulator.connected = accumulator.connected && currentValue.connected
}
return accumulator
}
15 changes: 12 additions & 3 deletions tests/helpers/core-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,20 @@ export function createCoreManager({
*
* @param {CoreManager} cm1
* @param {CoreManager} cm2
* @param {{ kp1: import('../../src/types.js').KeyPair, kp2: import('../../src/types.js').KeyPair }} [opts]
* @returns
*/
export function replicate(cm1, cm2) {
const n1 = new NoiseSecretStream(true)
const n2 = new NoiseSecretStream(false)
export function replicate(
cm1,
cm2,
{
// Keep keypairs deterministic for tests, since we use peer.publicKey as an identifier.
kp1 = NoiseSecretStream.keyPair(Buffer.allocUnsafe(32).fill(0)),
kp2 = NoiseSecretStream.keyPair(Buffer.allocUnsafe(32).fill(1)),
} = {}
) {
const n1 = new NoiseSecretStream(true, undefined, { keyPair: kp1 })
const n2 = new NoiseSecretStream(false, undefined, { keyPair: kp2 })
n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream)

const rsm1 = cm1.replicate(n1)
Expand Down
13 changes: 0 additions & 13 deletions tests/helpers/replication-state.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import NoiseSecretStream from '@hyperswarm/secret-stream'

import { truncateId } from '../../src/utils.js'

export function logState(syncState, name) {
Expand Down Expand Up @@ -58,14 +56,3 @@ export async function downloadCore(
const core = coreManager.getCoreByKey(key)
await core.download({ start, end, ifAvailable: true }).done()
}

export function replicate(cm1, cm2) {
const n1 = new NoiseSecretStream(true)
const n2 = new NoiseSecretStream(false)
n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream)

cm1.replicate(n1)
cm2.replicate(n2)

return { syncStream1: n1, syncStream2: n2 }
}
60 changes: 45 additions & 15 deletions tests/sync/namespace-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,32 @@ import {
createCoreManager,
waitForCores,
getKeys,
replicate,
} from '../helpers/core-manager.js'
import { replicate } from '../helpers/replication-state.js'
import { randomBytes } from 'crypto'

test('sync cores in a namespace', async function (t) {
t.plan(2)
const projectKeyPair = KeyManager.generateProjectKeypair()
const rootKey1 = randomBytes(16)
const rootKey2 = randomBytes(16)
const km1 = new KeyManager(rootKey1)
const km2 = new KeyManager(rootKey2)

const cm1 = createCoreManager({
rootKey: rootKey1,
projectKey: projectKeyPair.publicKey,
projectSecretKey: projectKeyPair.secretKey,
})
const cm2 = createCoreManager({ projectKey: projectKeyPair.publicKey })
const cm2 = createCoreManager({
rootKey: rootKey2,
projectKey: projectKeyPair.publicKey,
})

replicate(cm1, cm2)
replicate(cm1, cm2, {
kp1: km1.getIdentityKeypair(),
kp2: km2.getIdentityKeypair(),
})

await Promise.all([
waitForCores(cm1, getKeys(cm2, 'auth')),
Expand All @@ -33,15 +45,24 @@ test('sync cores in a namespace', async function (t) {
coreManager: cm1,
namespace: 'auth',
onUpdate: () => {
const { localState } = syncState1.getState()
const state = syncState1.getState()
if (
localState.want === 0 &&
localState.wanted === 0 &&
localState.have === 30 &&
localState.missing === 10 &&
state.localState.want === 0 &&
state.localState.wanted === 0 &&
state.localState.have === 30 &&
state.localState.missing === 10 &&
!syncState1Synced
) {
t.pass('syncState1 is synced')
const expected = {
[km2.getIdentityKeypair().publicKey.toString('hex')]: {
want: 0,
wanted: 0,
have: 30,
missing: 10,
connected: true,
},
}
t.alike(state.remoteStates, expected, 'syncState1 is synced')
syncState1Synced = true
}
},
Expand All @@ -51,15 +72,24 @@ test('sync cores in a namespace', async function (t) {
coreManager: cm2,
namespace: 'auth',
onUpdate: () => {
const { localState } = syncState2.getState()
const state = syncState2.getState()
if (
localState.want === 0 &&
localState.wanted === 0 &&
localState.have === 30 &&
localState.missing === 10 &&
state.localState.want === 0 &&
state.localState.wanted === 0 &&
state.localState.have === 30 &&
state.localState.missing === 10 &&
!syncState2Synced
) {
t.pass('syncState2 is synced')
const expected = {
[km1.getIdentityKeypair().publicKey.toString('hex')]: {
want: 0,
wanted: 0,
have: 30,
missing: 10,
connected: true,
},
}
t.alike(state.remoteStates, expected, 'syncState2 is synced')
syncState2Synced = true
}
},
Expand Down