Skip to content

Commit

Permalink
Merge branch 'feat/peer-sync-controller' into feat/sync-controller
Browse files Browse the repository at this point in the history
* feat/peer-sync-controller:
  WIP: fix up tests and start addressing bugs
  • Loading branch information
gmaclennan committed Oct 24, 2023
2 parents 8c834a0 + 49dad05 commit 314c0ed
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 14 deletions.
16 changes: 9 additions & 7 deletions src/sync/peer-sync-controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,22 @@ export class PeerSyncController {
#syncStatus = createSyncStatusObject()
/** @type {Map<import('hypercore')<'binary', any>, ReturnType<import('hypercore')['download']>>} */
#downloadingRanges = new Map()
/** @type {SyncStatus | undefined} */
#prevSyncStatus

/**
* @param {object} opts
* @param {import("protomux")<import('../types.js').NoiseStream>} opts.protomux
* @param {import("../core-manager/index.js").CoreManager} opts.coreManager
* @param {import("./sync-state.js").SyncState} opts.syncState
* @param {import("../capabilities.js").Capabilities} opts.capabilities
* @param {string} opts.peerId device id for the peer: the device public key as a hex-encoded string
*/
constructor({ protomux, coreManager, syncState, capabilities }) {
constructor({ protomux, coreManager, syncState, capabilities, peerId }) {
this.#coreManager = coreManager
this.#protomux = protomux
this.#capabilities = capabilities
if (!protomux.stream.remotePublicKey) {
throw new Error(
'Unitialized NoiseSecretStream: Protomux stream does not have `remotePublicKey`'
)
}
this.#peerId = protomux.stream.remotePublicKey.toString('hex')
this.#peerId = peerId

// Always need to replicate the project creator core
coreManager.creatorCore.replicate(protomux)
Expand Down Expand Up @@ -105,6 +103,7 @@ export class PeerSyncController {
return [ns, this.#prevLocalState[ns].have !== localState[ns].have]
})
this.#prevLocalState = localState
this.#prevSyncStatus = this.#syncStatus

if (didUpdate.auth && this.#syncStatus.auth === 'synced') {
try {
Expand All @@ -115,6 +114,9 @@ export class PeerSyncController {
this.#syncCapability = createSyncCapabilityObject('blocked')
}
}
console.log('sync status', this.#peerId, this.#syncStatus)
console.log('cap', this.#syncCapability)
console.log('state', state.auth)

// If any namespace has new data, update what is enabled
if (Object.values(didUpdate).indexOf(true) > -1) {
Expand Down
2 changes: 1 addition & 1 deletion src/sync/sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class SyncState extends TypedEmitter {
return state
}

#handleUpdate() {
#handleUpdate = () => {
this.emit('state', this.getState())
}
}
42 changes: 36 additions & 6 deletions tests/sync/peer-sync-controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import { KeyManager } from '@mapeo/crypto'
import { once } from 'node:events'
import { setTimeout } from 'node:timers/promises'
import { NAMESPACES } from '../../src/core-manager/index.js'
import { SyncState } from '../../src/sync/sync-state.js'
import { CREATOR_CAPABILITIES } from '../../src/capabilities.js'

test('creator core is always replicated', async (t) => {
test.solo('creator core is always replicated', async (t) => {
const {
coreManagers: [cm1, cm2],
} = await testenv()
Expand All @@ -24,18 +26,25 @@ test('creator core is always replicated', async (t) => {

for (const namespace of NAMESPACES) {
const cm1NamespaceCores = cm1.getCores(namespace)
t.is(cm1NamespaceCores.length, 1, 'each namespace has one core')
t.is(
cm1NamespaceCores.length,
namespace === 'auth' ? 2 : 1,
'each namespace apart from auth has one core'
)
const cm2NamespaceCores = cm2.getCores(namespace)
t.is(
cm2NamespaceCores.length,
namespace === 'auth' ? 2 : 1,
'each namespace apart from auth has one core'
)
for (const { core, key } of [...cm1NamespaceCores, ...cm2NamespaceCores]) {
if (key.equals(cm1.creatorCore.key)) {
t.is(core.peers.length, 1, 'Creator cores has one peer')
for (const { core, namespace } of [
...cm1NamespaceCores,
...cm2NamespaceCores,
]) {
if (namespace === 'auth') {
t.is(core.peers.length, 1, 'Auth cores has one peer')
} else {
t.is(core.peers.length, 0, 'non-creator cores have no peers')
t.is(core.peers.length, 0, 'non-auth cores have no peers')
}
}
}
Expand Down Expand Up @@ -144,13 +153,34 @@ async function testenv() {
})
stream1.pipe(stream2).pipe(stream1)

await Promise.all([
once(stream1.noiseStream, 'connect'),
once(stream2.noiseStream, 'connect'),
])

const psc1 = new PeerSyncController({
protomux: stream1.noiseStream.userData,
coreManager: cm1,
syncState: new SyncState({ coreManager: cm1 }),
// @ts-expect-error
capabilities: {
async getCapabilities() {
return CREATOR_CAPABILITIES
},
},
peerId: stream1.noiseStream.remotePublicKey.toString('hex'),
})
const psc2 = new PeerSyncController({
protomux: stream2.noiseStream.userData,
coreManager: cm2,
syncState: new SyncState({ coreManager: cm2 }),
// @ts-expect-error
capabilities: {
async getCapabilities() {
return CREATOR_CAPABILITIES
},
},
peerId: stream2.noiseStream.remotePublicKey.toString('hex'),
})

return {
Expand Down

0 comments on commit 314c0ed

Please sign in to comment.