diff --git a/src/sync/core-sync-state.js b/src/sync/core-sync-state.js index ca7a0ab22..b96b241de 100644 --- a/src/sync/core-sync-state.js +++ b/src/sync/core-sync-state.js @@ -155,7 +155,7 @@ export class CoreSyncState { * @param {Uint32Array} bitfield */ insertPreHaves(peerId, start, bitfield) { - const peerState = this.#getPeerState(peerId) + const peerState = this.#getOrCreatePeerState(peerId) peerState.insertPreHaves(start, bitfield) const previousLength = Math.max( this.#preHavesLength, @@ -185,7 +185,7 @@ export class CoreSyncState { * @param {Array<{ start: number, length: number }>} ranges */ setPeerWants(peerId, ranges) { - const peerState = this.#getPeerState(peerId) + const peerState = this.#getOrCreatePeerState(peerId) for (const { start, length } of ranges) { peerState.setWantRange({ start, length }) } @@ -204,7 +204,17 @@ export class CoreSyncState { /** * @param {PeerId} peerId */ - #getPeerState(peerId) { + disconnectPeer(peerId) { + const wasRemoved = this.#remoteStates.delete(peerId) + if (wasRemoved) { + this.#update() + } + } + + /** + * @param {PeerId} peerId + */ + #getOrCreatePeerState(peerId) { let peerState = this.#remoteStates.get(peerId) if (!peerState) { peerState = new PeerState() @@ -224,7 +234,7 @@ export class CoreSyncState { const peerId = keyToId(peer.remotePublicKey) // Update state to ensure this peer is in the state correctly - const peerState = this.#getPeerState(peerId) + const peerState = this.#getOrCreatePeerState(peerId) peerState.status = 'starting' this.#core?.update({ wait: true }).then(() => { @@ -260,7 +270,8 @@ export class CoreSyncState { */ #onPeerRemove = (peer) => { const peerId = keyToId(peer.remotePublicKey) - const peerState = this.#getPeerState(peerId) + const peerState = this.#remoteStates.get(peerId) + if (!peerState) return peerState.status = 'stopped' this.#update() } diff --git a/src/sync/namespace-sync-state.js b/src/sync/namespace-sync-state.js index 234a199db..699f90094 100644 --- a/src/sync/namespace-sync-state.js +++ b/src/sync/namespace-sync-state.js @@ -105,6 +105,15 @@ export class NamespaceSyncState { } } + /** + * @param {string} peerId + */ + disconnectPeer(peerId) { + for (const css of this.#coreStates.values()) { + css.disconnectPeer(peerId) + } + } + /** * @param {import('hypercore')<"binary", Buffer>} core * @param {Buffer} coreKey diff --git a/src/sync/sync-api.js b/src/sync/sync-api.js index a1738cb34..00a0cd29b 100644 --- a/src/sync/sync-api.js +++ b/src/sync/sync-api.js @@ -105,7 +105,7 @@ export class SyncApi extends TypedEmitter { }) this.#coreManager.creatorCore.on('peer-add', this.#handlePeerAdd) - this.#coreManager.creatorCore.on('peer-remove', this.#handlePeerRemove) + this.#coreManager.creatorCore.on('peer-remove', this.#handlePeerDisconnect) roles.on('update', this.#handleRoleUpdate) coreOwnership.on('update', this.#handleCoreOwnershipUpdate) @@ -406,7 +406,7 @@ export class SyncApi extends TypedEmitter { * * @param {{ protomux: import('protomux'), remotePublicKey: Buffer }} peer */ - #handlePeerRemove = (peer) => { + #handlePeerDisconnect = (peer) => { const { protomux } = peer if (!this.#peerSyncControllers.has(protomux)) { this.#l.log( @@ -419,6 +419,8 @@ export class SyncApi extends TypedEmitter { const peerId = keyToId(peer.remotePublicKey) this.#pscByPeerId.delete(peerId) this.#pendingDiscoveryKeys.delete(protomux) + this[kSyncState].disconnectPeer(peerId) + this.#updateState() } /** diff --git a/src/sync/sync-state.js b/src/sync/sync-state.js index 26e450687..7e64836a0 100644 --- a/src/sync/sync-state.js +++ b/src/sync/sync-state.js @@ -49,6 +49,15 @@ export class SyncState extends TypedEmitter { } } + /** + * @param {string} peerId + */ + disconnectPeer(peerId) { + for (const nss of Object.values(this.#syncStates)) { + nss.disconnectPeer(peerId) + } + } + /** * @returns {State} */ diff --git a/test-e2e/sync.js b/test-e2e/sync.js index 47da3bc58..5a6bfdc0f 100644 --- a/test-e2e/sync.js +++ b/test-e2e/sync.js @@ -1109,3 +1109,98 @@ test('data sync state is properly updated as data sync is enabled and disabled', 'other invitee is still disabled, still wants something' ) }) + +test( + 'Sync state with disconnected peer (disconnected peer wants)', + { timeout: 100_000 }, + async (t) => { + // 1. Connect to a peer, invite it + // 2. Disconnect from the peer + // 3. Connect to a new peer, invite it + // 4. Wait for initial sync with new peer + // 5. Sync should complete with new peer + + const managers = await createManagers(3, t) + const [invitor, inviteeA, inviteeB] = managers + const disconnectA = connectPeers([invitor, inviteeA], { discovery: false }) + const projectId = await invitor.createProject({ name: 'Mapeo' }) + await invite({ invitor, invitees: [inviteeA], projectId }) + + const [invitorProject, inviteeAProject] = await Promise.all( + [invitor, inviteeA].map((m) => m.getProject(projectId)) + ) + + await Promise.all( + [invitorProject, inviteeAProject].map((p) => + p.$sync.waitForSync('initial') + ) + ) + + await disconnectA() + + const disconnectB = connectPeers([invitor, inviteeB], { discovery: false }) + await invite({ invitor, invitees: [inviteeB], projectId }) + await pTimeout(invitorProject.$sync.waitForSync('initial'), { + milliseconds: 1000, + message: 'invitor should complete initial sync with inviteeB', + }) + + await disconnectB() + } +) + +test( + 'Sync state with disconnected peer (want data from peer)', + { timeout: 100_000 }, + async (t) => { + // 1. Connect to two peers, invite them + // 2. One peer adds an observation, does not sync it + // 3. Wait until other two peers "know" about that observation + // 4. Disconnect peer that added observation + // 5. Attempt to sync remaining connected peers + // 6. Sync should complete with remaining connected peer + + const managers = await createManagers(3, t) + const [invitor, inviteeA, inviteeB] = managers + const disconnectA = connectPeers([invitor, inviteeA], { discovery: false }) + const disconnectB = connectPeers([invitor, inviteeB], { discovery: false }) + const projectId = await invitor.createProject({ name: 'Mapeo' }) + await invite({ invitor, invitees: [inviteeA, inviteeB], projectId }) + + const [invitorProject, inviteeAProject, inviteeBProject] = + await Promise.all( + [invitor, inviteeA, inviteeB].map((m) => m.getProject(projectId)) + ) + + await inviteeAProject.observation.create( + valueOf(generate('observation')[0]) + ) + + await Promise.all( + [invitorProject, inviteeAProject].map((p) => + p.$sync.waitForSync('initial') + ) + ) + + // Need to wait for pre-have of inviteeA observation to be received + await new Promise((res) => { + invitorProject.$sync[kSyncState].on('state', function onState(state) { + if (state.data.localState.want === 1) { + invitorProject.$sync[kSyncState].removeListener('state', onState) + res(void 0) + } + }) + }) + await disconnectA() + + invitorProject.$sync.start() + inviteeBProject.$sync.start() + + await pTimeout(invitorProject.$sync.waitForSync('full'), { + milliseconds: 1000, + message: 'invitor should complete full sync with inviteeB', + }) + + await disconnectB() + } +)