Skip to content

Commit

Permalink
fix: sync state with disconnected peers (take 2) (#891)
Browse files Browse the repository at this point in the history
* chore: add failing tests for sync state when peers disconnect

* fix: Fix sync state with disconnected peers

* remove superflous log statement
  • Loading branch information
gmaclennan authored Oct 4, 2024
1 parent 9270c62 commit 301a082
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 7 deletions.
21 changes: 16 additions & 5 deletions src/sync/core-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ export class CoreSyncState {
* @param {Uint32Array} bitfield
*/
insertPreHaves(peerId, start, bitfield) {
const peerState = this.#getPeerState(peerId)
const peerState = this.#getOrCreatePeerState(peerId)
peerState.insertPreHaves(start, bitfield)
const previousLength = Math.max(
this.#preHavesLength,
Expand Down Expand Up @@ -185,7 +185,7 @@ export class CoreSyncState {
* @param {Array<{ start: number, length: number }>} ranges
*/
setPeerWants(peerId, ranges) {
const peerState = this.#getPeerState(peerId)
const peerState = this.#getOrCreatePeerState(peerId)
for (const { start, length } of ranges) {
peerState.setWantRange({ start, length })
}
Expand All @@ -204,7 +204,17 @@ export class CoreSyncState {
/**
* @param {PeerId} peerId
*/
#getPeerState(peerId) {
disconnectPeer(peerId) {
const wasRemoved = this.#remoteStates.delete(peerId)
if (wasRemoved) {
this.#update()
}
}

/**
* @param {PeerId} peerId
*/
#getOrCreatePeerState(peerId) {
let peerState = this.#remoteStates.get(peerId)
if (!peerState) {
peerState = new PeerState()
Expand All @@ -224,7 +234,7 @@ export class CoreSyncState {
const peerId = keyToId(peer.remotePublicKey)

// Update state to ensure this peer is in the state correctly
const peerState = this.#getPeerState(peerId)
const peerState = this.#getOrCreatePeerState(peerId)
peerState.status = 'starting'

this.#core?.update({ wait: true }).then(() => {
Expand Down Expand Up @@ -260,7 +270,8 @@ export class CoreSyncState {
*/
#onPeerRemove = (peer) => {
const peerId = keyToId(peer.remotePublicKey)
const peerState = this.#getPeerState(peerId)
const peerState = this.#remoteStates.get(peerId)
if (!peerState) return
peerState.status = 'stopped'
this.#update()
}
Expand Down
9 changes: 9 additions & 0 deletions src/sync/namespace-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ export class NamespaceSyncState {
}
}

/**
* @param {string} peerId
*/
disconnectPeer(peerId) {
for (const css of this.#coreStates.values()) {
css.disconnectPeer(peerId)
}
}

/**
* @param {import('hypercore')<"binary", Buffer>} core
* @param {Buffer} coreKey
Expand Down
6 changes: 4 additions & 2 deletions src/sync/sync-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ export class SyncApi extends TypedEmitter {
})

this.#coreManager.creatorCore.on('peer-add', this.#handlePeerAdd)
this.#coreManager.creatorCore.on('peer-remove', this.#handlePeerRemove)
this.#coreManager.creatorCore.on('peer-remove', this.#handlePeerDisconnect)

roles.on('update', this.#handleRoleUpdate)
coreOwnership.on('update', this.#handleCoreOwnershipUpdate)
Expand Down Expand Up @@ -406,7 +406,7 @@ export class SyncApi extends TypedEmitter {
*
* @param {{ protomux: import('protomux')<import('@hyperswarm/secret-stream')>, remotePublicKey: Buffer }} peer
*/
#handlePeerRemove = (peer) => {
#handlePeerDisconnect = (peer) => {
const { protomux } = peer
if (!this.#peerSyncControllers.has(protomux)) {
this.#l.log(
Expand All @@ -419,6 +419,8 @@ export class SyncApi extends TypedEmitter {
const peerId = keyToId(peer.remotePublicKey)
this.#pscByPeerId.delete(peerId)
this.#pendingDiscoveryKeys.delete(protomux)
this[kSyncState].disconnectPeer(peerId)
this.#updateState()
}

/**
Expand Down
9 changes: 9 additions & 0 deletions src/sync/sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ export class SyncState extends TypedEmitter {
}
}

/**
* @param {string} peerId
*/
disconnectPeer(peerId) {
for (const nss of Object.values(this.#syncStates)) {
nss.disconnectPeer(peerId)
}
}

/**
* @returns {State}
*/
Expand Down
95 changes: 95 additions & 0 deletions test-e2e/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -1109,3 +1109,98 @@ test('data sync state is properly updated as data sync is enabled and disabled',
'other invitee is still disabled, still wants something'
)
})

test(
'Sync state with disconnected peer (disconnected peer wants)',
{ timeout: 100_000 },
async (t) => {
// 1. Connect to a peer, invite it
// 2. Disconnect from the peer
// 3. Connect to a new peer, invite it
// 4. Wait for initial sync with new peer
// 5. Sync should complete with new peer

const managers = await createManagers(3, t)
const [invitor, inviteeA, inviteeB] = managers
const disconnectA = connectPeers([invitor, inviteeA], { discovery: false })
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees: [inviteeA], projectId })

const [invitorProject, inviteeAProject] = await Promise.all(
[invitor, inviteeA].map((m) => m.getProject(projectId))
)

await Promise.all(
[invitorProject, inviteeAProject].map((p) =>
p.$sync.waitForSync('initial')
)
)

await disconnectA()

const disconnectB = connectPeers([invitor, inviteeB], { discovery: false })
await invite({ invitor, invitees: [inviteeB], projectId })
await pTimeout(invitorProject.$sync.waitForSync('initial'), {
milliseconds: 1000,
message: 'invitor should complete initial sync with inviteeB',
})

await disconnectB()
}
)

test(
'Sync state with disconnected peer (want data from peer)',
{ timeout: 100_000 },
async (t) => {
// 1. Connect to two peers, invite them
// 2. One peer adds an observation, does not sync it
// 3. Wait until other two peers "know" about that observation
// 4. Disconnect peer that added observation
// 5. Attempt to sync remaining connected peers
// 6. Sync should complete with remaining connected peer

const managers = await createManagers(3, t)
const [invitor, inviteeA, inviteeB] = managers
const disconnectA = connectPeers([invitor, inviteeA], { discovery: false })
const disconnectB = connectPeers([invitor, inviteeB], { discovery: false })
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees: [inviteeA, inviteeB], projectId })

const [invitorProject, inviteeAProject, inviteeBProject] =
await Promise.all(
[invitor, inviteeA, inviteeB].map((m) => m.getProject(projectId))
)

await inviteeAProject.observation.create(
valueOf(generate('observation')[0])
)

await Promise.all(
[invitorProject, inviteeAProject].map((p) =>
p.$sync.waitForSync('initial')
)
)

// Need to wait for pre-have of inviteeA observation to be received
await new Promise((res) => {
invitorProject.$sync[kSyncState].on('state', function onState(state) {
if (state.data.localState.want === 1) {
invitorProject.$sync[kSyncState].removeListener('state', onState)
res(void 0)
}
})
})
await disconnectA()

invitorProject.$sync.start()
inviteeBProject.$sync.start()

await pTimeout(invitorProject.$sync.waitForSync('full'), {
milliseconds: 1000,
message: 'invitor should complete full sync with inviteeB',
})

await disconnectB()
}
)

0 comments on commit 301a082

Please sign in to comment.