diff --git a/src/lib/hypercore-helpers.js b/src/lib/hypercore-helpers.js new file mode 100644 index 000000000..3c96b1f58 --- /dev/null +++ b/src/lib/hypercore-helpers.js @@ -0,0 +1,18 @@ +import { assert } from '../utils.js' + +/** + * @param {import('hypercore')<'binary', any>} core Core to unreplicate. Must be ready. + * @param {import('protomux')} protomux + */ +export function unreplicate(core, protomux) { + assert(core.discoveryKey, 'Core should have a discovery key') + protomux.unpair({ + protocol: 'hypercore/alpha', + id: core.discoveryKey, + }) + for (const channel of protomux) { + if (channel.protocol !== 'hypercore/alpha') continue + if (!channel.id.equals(core.discoveryKey)) continue + channel.close() + } +} diff --git a/src/sync/peer-sync-controller.js b/src/sync/peer-sync-controller.js index b5dff6b1f..ca10d72dd 100644 --- a/src/sync/peer-sync-controller.js +++ b/src/sync/peer-sync-controller.js @@ -2,6 +2,7 @@ import mapObject from 'map-obj' import { NAMESPACES, PRESYNC_NAMESPACES } from '../constants.js' import { Logger } from '../logger.js' import { ExhaustivenessError, createMap } from '../utils.js' +import { unreplicate } from '../lib/hypercore-helpers.js' /** @import { CoreRecord } from '../core-manager/index.js' */ /** @import { Role } from '../roles.js' */ /** @import { SyncEnabledState } from './sync-api.js' */ @@ -255,16 +256,22 @@ export class PeerSyncController { /** * @param {import('hypercore')<'binary', any>} core + * @returns {Promise} */ - #unreplicateCore(core) { + async #unreplicateCore(core) { if (core === this.#coreManager.creatorCore) return - const peerToUnreplicate = core.peers.find( - (peer) => peer.protomux === this.#protomux - ) - if (!peerToUnreplicate) return - this.#log('unreplicating core %k', core.key) - peerToUnreplicate.channel.close() + this.#replicatingCores.delete(core) + + const isCoreReady = Boolean(core.discoveryKey) + if (!isCoreReady) { + await core.ready() + const wasReEnabledWhileWaiting = this.#replicatingCores.has(core) + if (wasReEnabledWhileWaiting) return + } + + unreplicate(core, this.#protomux) + this.#log('unreplicated core %k', core.key) } /** diff --git a/test-e2e/sync-fuzz.js b/test-e2e/sync-fuzz.js new file mode 100644 index 000000000..35c01c6f8 --- /dev/null +++ b/test-e2e/sync-fuzz.js @@ -0,0 +1,348 @@ +/** + * @overview Randomly performs sync operations until something breaks. + * + * This is run as part of normal testing. You can also run it with custom + * arguments like this: + * + * ```sh + * COMAPEO_SYNC_FUZZ_TEST_COUNT=10 \ + * COMAPEO_SYNC_FUZZ_MIN_MANAGER_COUNT=2 \ + * COMAPEO_SYNC_FUZZ_MAX_MANAGER_COUNT=3 \ + * COMAPEO_SYNC_FUZZ_MIN_ACTION_COUNT=4 \ + * COMAPEO_SYNC_FUZZ_MAX_ACTION_COUNT=32 \ + * node --test test-e2e/sync-fuzz.js + * ``` + */ + +import { generate } from '@mapeo/mock-data' +import { map } from 'iterpal' +import assert from 'node:assert/strict' +import { randomInt } from 'node:crypto' +import * as process from 'node:process' +import test from 'node:test' +import { setTimeout as delay } from 'node:timers/promises' +import { isDeepStrictEqual } from 'node:util' +import { valueOf } from '../src/utils.js' +import { + connectPeers, + createManagers, + invite, + sample, + setAdd, + waitForSync, +} from './utils.js' +/** @import { MapeoProject } from '../src/mapeo-project.js' */ + +/** + * @internal + * @typedef {object} ProjectState + * @prop {boolean} isSyncEnabled + * @prop {Readonly>} observationIds + */ + +/** + * @internal + * @typedef {ReadonlyArray} State + */ + +/** + * @internal + * @typedef {object} ActionResult + * @prop {string} title + * @prop {State} newExpectedState + */ + +/** + * @internal + * @callback Action + * @param {State} oldExpectedState + * @returns {ActionResult | Promise} + */ + +test('sync fuzz tests', { concurrency: true, timeout: 2 ** 30 }, async (t) => { + const testCount = getEnvironmentVariableInt('TEST_COUNT', 10) + const minManagerCount = getEnvironmentVariableInt('MIN_MANAGER_COUNT', 2) + const maxManagerCount = getEnvironmentVariableInt('MAX_MANAGER_COUNT', 3) + const minActionCount = getEnvironmentVariableInt('MIN_ACTION_COUNT', 4) + const maxActionCount = getEnvironmentVariableInt('MAX_ACTION_COUNT', 32) + assert( + minManagerCount <= maxManagerCount, + 'min manager count is greater than max. Test is not set up correctly' + ) + assert( + minActionCount <= maxActionCount, + 'min action count is greater than max. Test is not set up correctly' + ) + + for (let i = 1; i <= testCount; i++) { + await t.test( + `fuzz test #${i}`, + { concurrency: true, timeout: 120_000 }, + async (t) => { + const managerCount = randomInt(minManagerCount, maxManagerCount + 1) + const actionCount = randomInt(minActionCount, maxActionCount + 1) + + const managers = await createManagers(managerCount, t) + const [invitor, ...invitees] = managers + + const disconnect = connectPeers(managers, { discovery: false }) + t.after(disconnect) + + const projectId = await invitor.createProject({ name: 'Mapeo' }) + await invite({ invitor, invitees, projectId }) + + const projects = await Promise.all( + managers.map((m) => m.getProject(projectId)) + ) + t.after(() => + Promise.all( + projects.map(async (project) => { + project.$sync.stop() + await project.close() + }) + ) + ) + await waitForSync(projects, 'initial') + + /** @type {string[]} */ + const actionTitles = [] + /** @type {State} */ + let expectedState = projects.map(() => ({ + isSyncEnabled: false, + observationIds: new Set(), + })) + + for (let i = 0; i < actionCount; i++) { + const possibleActions = getPossibleNextActions(projects) + const action = sample(possibleActions) + assert(action, 'no next step? test is broken') + + const result = await action(expectedState) + actionTitles.push(result.title) + expectedState = result.newExpectedState + + await waitForStateToMatch(projects, expectedState, actionTitles) + } + } + ) + } +}) + +/** + * @param {string} name + * @param {number} defaultValue + * @returns {number} + */ +function getEnvironmentVariableInt(name, defaultValue) { + const fullName = 'COMAPEO_SYNC_FUZZ_' + name + const rawValue = process.env[fullName] + if (!rawValue) return defaultValue + + const result = parseInt(rawValue, 10) + assert(result > 0, `${fullName} must be positive`) + assert(Number.isFinite(result), `Can't parse ${fullName}`) + assert(Number.isSafeInteger(result), `${fullName} must be a safe integer`) + + return result +} + +/** + * @param {MapeoProject} project + * @returns {boolean} + */ +function isSyncEnabled(project) { + return project.$sync.getState().data.isSyncEnabled +} + +/** + * @param {MapeoProject} project + * @param {number} index + * @returns {Action[]} + */ +function getPossibleNextActionsForProject(project, index) { + /** @type {Action[]} */ + const result = [] + + // Add observation + + result.push(async (expectedState) => { + const observation = await project.observation.create( + valueOf(generate('observation')[0]) + ) + + const myProject = expectedState[index] + return { + title: `Project ${index} added observation ${observation.docId}`, + newExpectedState: expectedState.map((otherProject) => { + const shouldAddThisObservationToList = + myProject === otherProject || + (myProject.isSyncEnabled && otherProject.isSyncEnabled) + return { + ...otherProject, + observationIds: shouldAddThisObservationToList + ? setAdd(otherProject.observationIds, observation.docId) + : otherProject.observationIds, + } + }), + } + }) + + // Start or stop sync + + if (isSyncEnabled(project)) { + result.push(async (expectedState) => { + project.$sync.stop() + await delay(10) + + const myProject = expectedState[index] + return { + title: `Project ${index} stopped sync`, + newExpectedState: expectedState.map((otherProject) => { + if (otherProject === myProject) { + return { ...otherProject, isSyncEnabled: false } + } else { + return otherProject + } + }), + } + }) + } else { + result.push(async (expectedState) => { + project.$sync.start() + await delay(10) + + const myProject = expectedState[index] + return { + title: `Project ${index} started sync`, + newExpectedState: expectedState.map((otherProject) => { + let { observationIds } = myProject + if (myProject === otherProject) { + for (const p of expectedState) { + if (p.isSyncEnabled) { + observationIds = setAdd(observationIds, ...p.observationIds) + } + } + return { ...myProject, isSyncEnabled: true, observationIds } + } else if (otherProject.isSyncEnabled) { + return { + ...otherProject, + observationIds: setAdd( + otherProject.observationIds, + ...observationIds + ), + } + } else { + return otherProject + } + }), + } + }) + } + + return result +} + +/** + * @param {ReadonlyArray} projects + * @returns {Action[]} + */ +function getPossibleNextActions(projects) { + return projects.flatMap(getPossibleNextActionsForProject) +} + +/** + * @param {MapeoProject[]} projects + * @param {State} expectedState + * @returns {Promise} + */ +async function doesStateMatch(projects, expectedState) { + const results = await Promise.all( + projects.map(async (project, index) => { + const expectedProjectState = expectedState[index] + assert(expectedProjectState, 'test not set up correctly') + + const actualSyncEnabled = isSyncEnabled(project) + const expectedSyncEnabled = expectedProjectState.isSyncEnabled + if (expectedSyncEnabled !== actualSyncEnabled) return false + + const observations = await project.observation.getMany() + const actualObservationIds = new Set(map(observations, (o) => o.docId)) + const expectedObservationIds = expectedProjectState.observationIds + return isDeepStrictEqual(expectedObservationIds, actualObservationIds) + }) + ) + return results.every(Boolean) +} + +/** + * @param {MapeoProject[]} projects + * @param {State} expectedState + * @param {ReadonlyArray} actionTitles + * @returns {Promise} + */ +function waitForStateToMatch(projects, expectedState, actionTitles) { + return new Promise((resolve, reject) => { + const timeout = setTimeout(async () => { + const actualState = await Promise.all( + projects.map(async (project) => { + const observations = await project.observation.getMany() + const observationIds = new Set(map(observations, (o) => o.docId)) + return { + isSyncEnabled: isSyncEnabled(project), + observationIds, + } + }) + ) + + const lines = [ + 'Expected states to be strictly equal after the following steps, but timed out:', + '', + ...actionTitles, + '', + 'Expected the following:', + ...expectedState.flatMap(assertionLinesForProject), + '', + 'Actually got the following:', + ...actualState.flatMap(assertionLinesForProject), + '', + ] + reject(new Error(lines.join('\n'))) + }, 5000) + + const removeListeners = () => { + for (const project of projects) { + project.$sync.off('sync-state', onState) + } + } + + const onState = async () => { + if (await doesStateMatch(projects, expectedState)) { + removeListeners() + clearTimeout(timeout) + resolve() + } + } + + for (const project of projects) { + project.$sync.on('sync-state', onState) + } + + onState().catch(reject) + }) +} + +/** + * @param {ProjectState} projectState + * @param {number} index + * @returns {string[]} + */ +function assertionLinesForProject({ isSyncEnabled, observationIds }, index) { + return [ + `Project ${index} (sync ${isSyncEnabled ? 'started' : 'stopped'}): ${ + observationIds.size + } observation${observationIds.size === 1 ? '' : 's'}`, + ...Array.from(observationIds) + .sort() + .map((observationId) => ` ${observationId}`), + ] +} diff --git a/test-e2e/sync.js b/test-e2e/sync.js index ea21fc9a3..d8dafa2ab 100644 --- a/test-e2e/sync.js +++ b/test-e2e/sync.js @@ -236,6 +236,60 @@ test('start and stop sync', async function (t) { await disconnect() }) +test('sync only happens if both sides are enabled', async (t) => { + const managers = await createManagers(2, t) + const [invitor, ...invitees] = managers + + const disconnect = connectPeers(managers, { discovery: false }) + t.after(disconnect) + + const projectId = await invitor.createProject({ name: 'Mapeo' }) + await invite({ invitor, invitees, projectId }) + + const projects = await Promise.all( + managers.map((m) => m.getProject(projectId)) + ) + t.after(() => Promise.all(projects.map((project) => project.close()))) + const [invitorProject, inviteeProject] = projects + + const generatedObservations = generate('observation', { count: 3 }) + + const obs1 = await invitorProject.observation.create( + valueOf(generatedObservations[0]) + ) + const obs2 = await inviteeProject.observation.create( + valueOf(generatedObservations[1]) + ) + + await waitForSync(projects, 'initial') + + invitorProject.$sync.start() + inviteeProject.$sync.start() + await waitForSync(projects, 'full') + + assert(await inviteeProject.observation.getByDocId(obs1.docId)) + assert(await invitorProject.observation.getByDocId(obs2.docId)) + + invitorProject.$sync.stop() + inviteeProject.$sync.stop() + + const obs3 = await invitorProject.observation.create( + valueOf(generatedObservations[2]) + ) + + invitorProject.$sync.start() + + await assert.rejects( + () => pTimeout(waitForSync(projects, 'full'), { milliseconds: 1000 }), + 'wait for sync times out' + ) + + await assert.rejects( + () => inviteeProject.observation.getByDocId(obs3.docId), + 'one side stopping sync should prevent data from syncing' + ) +}) + test('auto-stop', async (t) => { const clock = FakeTimers.install({ shouldAdvanceTime: true }) t.after(() => clock.uninstall()) @@ -332,6 +386,7 @@ test('auto-stop', async (t) => { ) invitorProject.$sync.start() + inviteeProject.$sync.start() const observation3 = await invitorProject.observation.create( valueOf(generatedObservations[2]) diff --git a/test-e2e/utils.js b/test-e2e/utils.js index 5fa769262..2d1f5691b 100644 --- a/test-e2e/utils.js +++ b/test-e2e/utils.js @@ -567,6 +567,19 @@ export async function getExpectedConfig(path) { export function sortById(docs) { return sortBy(docs, 'docId') } + +/** + * @template T + * @param {Readonly>} set + * @param {ReadonlyArray} toAdd + * @returns {Set} + */ +export function setAdd(set, ...toAdd) { + const result = new Set(set) + for (const value of toAdd) result.add(value) + return result +} + /** * Lazy way of removing fields with undefined values from an object * @param {unknown} object @@ -591,3 +604,12 @@ export function randomNum({ min = 0, max = 1, precision } = {}) { if (typeof precision === 'undefined') return num return round(num, precision) } + +/** + * @template T + * @param {Readonly>} arr + * @returns {undefined | T} + */ +export function sample(arr) { + return arr[Math.floor(Math.random() * arr.length)] +} diff --git a/tests/core-manager.js b/tests/core-manager.js index fd37d193c..3de592461 100644 --- a/tests/core-manager.js +++ b/tests/core-manager.js @@ -1,5 +1,6 @@ import test from 'node:test' import { access, constants } from 'node:fs/promises' +import { setTimeout as delay } from 'node:timers/promises' import NoiseSecretStream from '@hyperswarm/secret-stream' import Hypercore from 'hypercore' import RAM from 'random-access-memory' @@ -11,6 +12,7 @@ import { CoreManager, kCoreManagerReplicate, } from '../src/core-manager/index.js' +import { unreplicate } from '../src/lib/hypercore-helpers.js' import RemoteBitfield from '../src/core-manager/remote-bitfield.js' import assert from 'node:assert/strict' import { once } from 'node:events' @@ -393,72 +395,125 @@ test('sends "haves" bitfields over project creator core replication stream', asy test('unreplicate', async (t) => { const WAIT_TIMEOUT = 200 const REPLICATION_DELAY = 20 - await t.test('initiator unreplicates, receiver re-replicates', async () => { - const a = await createCore() - await a.append(['a', 'b']) - const b = await createCore(a.key) - - const [s1, s2] = replicateCores(a, b, { delay: REPLICATION_DELAY }) - - const block1 = await b.get(0, { timeout: WAIT_TIMEOUT }) - assert.equal(block1?.toString(), 'a') - - await unreplicate(a, s1.noiseStream.userData) - - await assert.rejects( - () => b.get(1, { timeout: WAIT_TIMEOUT }), - 'Throws with timeout error' - ) - - b.replicate(s2) - - const block2 = await b.get(1, { timeout: WAIT_TIMEOUT }) - assert.equal(block2?.toString(), 'b') - }) - await t.test('initiator unreplicates, initiator re-replicates', async () => { - const a = await createCore() - await a.append(['a', 'b']) - const b = await createCore(a.key) - - const [s1] = replicateCores(a, b, { delay: REPLICATION_DELAY }) - - const block1 = await b.get(0, { timeout: WAIT_TIMEOUT }) - assert.equal(block1?.toString(), 'a') - - await unreplicate(a, s1.noiseStream.userData) - - await assert.rejects( - () => b.get(1, { timeout: 200 }), - 'Throws with timeout error' - ) - - a.replicate(s1) - - const block2 = await b.get(1, { timeout: WAIT_TIMEOUT }) - assert.equal(block2?.toString(), 'b') - }) - await t.test('receiver unreplicates, receiver re-replicates', async () => { - const a = await createCore() - await a.append(['a', 'b']) - const b = await createCore(a.key) - - const [, s2] = replicateCores(a, b, { delay: REPLICATION_DELAY }) - - const block1 = await b.get(0, { timeout: WAIT_TIMEOUT }) - assert.equal(block1?.toString(), 'a') - - await unreplicate(b, s2.noiseStream.userData) - - await assert.rejects( - () => b.get(1, { timeout: WAIT_TIMEOUT }), - 'Throws with timeout error' - ) + const scenarios = [ + { + unreplicate: ['initiator', 'receiver'], + rereplicate: ['initiator', 'receiver'], + expectedReadAfterReplicate: true, + }, + { + unreplicate: ['initiator'], + rereplicate: ['initiator'], + expectedReadAfterReplicate: true, + }, + { + unreplicate: ['receiver'], + rereplicate: ['receiver'], + expectedReadAfterReplicate: true, + }, + { + unreplicate: ['initiator', 'receiver'], + rereplicate: ['initiator'], + expectedReadAfterReplicate: false, + }, + { + unreplicate: ['initiator', 'receiver'], + rereplicate: ['receiver'], + expectedReadAfterReplicate: false, + }, + ] + // Add order permutations to scenarios + for (const scenario of [...scenarios]) { + if (scenario.unreplicate.length !== 2) continue + scenarios.push({ + ...scenario, + unreplicate: [scenario.unreplicate[1], scenario.unreplicate[0]], + }) + } + for (const scenario of [...scenarios]) { + if (scenario.rereplicate.length !== 2) continue + scenarios.push({ + ...scenario, + rereplicate: [scenario.rereplicate[1], scenario.rereplicate[0]], + }) + } - b.replicate(s2) + for (const unreplicateWait of [0, 100]) { + for (const scenario of scenarios) { + await t.test( + `unreplicate: ${scenario.unreplicate.join( + ', ' + )}; rereplicate: ${scenario.rereplicate.join( + ', ' + )}; delay: ${unreplicateWait}; expectedReadAfterReplicate: ${ + scenario.expectedReadAfterReplicate + }`, + async () => { + const a = await createCore() + await a.append(['a', 'b']) + const b = await createCore(a.key) + const c = await createCore(a.key) + + const [s1, s2] = replicateCores(a, b, { delay: REPLICATION_DELAY }) + replicateCores(a, c, { delay: REPLICATION_DELAY }) + + // Check replication is actually working + { + const block1 = await b.get(0, { timeout: WAIT_TIMEOUT }) + assert.equal(block1?.toString(), 'a') + } + { + const block1 = await c.get(0, { timeout: WAIT_TIMEOUT }) + assert.equal(block1?.toString(), 'a') + } + + // Unreplicate in order with delay + for (const toUnreplicate of scenario.unreplicate) { + const coreToUnreplicate = toUnreplicate === 'initiator' ? a : b + const protomuxToUnreplicate = + toUnreplicate === 'initiator' ? s1 : s2 + unreplicate( + coreToUnreplicate, + protomuxToUnreplicate.noiseStream.userData + ) + await delay(unreplicateWait) + } + + // Check that we can't read the next block + await assert.rejects( + () => b.get(1, { timeout: WAIT_TIMEOUT }), + 'Throws with timeout error' + ) - const block2 = await b.get(1, { timeout: WAIT_TIMEOUT }) - assert.equal(block2?.toString(), 'b') - }) + // Re-replicate in order with delay + for (const toRereplicate of scenario.rereplicate) { + const coreToRereplicate = toRereplicate === 'initiator' ? a : b + const protomuxToRereplicate = + toRereplicate === 'initiator' ? s1 : s2 + coreToRereplicate.replicate(protomuxToRereplicate) + await delay(unreplicateWait) + } + + // Check that we can read or not read the next block as expected + if (scenario.expectedReadAfterReplicate) { + const block2 = await b.get(1, { timeout: WAIT_TIMEOUT }) + assert.equal(block2?.toString(), 'b') + } else { + await assert.rejects( + () => b.get(1, { timeout: WAIT_TIMEOUT }), + 'Throws with timeout error' + ) + } + + // Replication with code 'c' should still work + { + const block2 = await c.get(1, { timeout: WAIT_TIMEOUT }) + assert.equal(block2?.toString(), 'b') + } + } + ) + } + } }) test('deleteOthersData()', async () => { @@ -699,20 +754,6 @@ function latencyStream(delay = 0) { }) } -/** - * - * @param {Hypercore<'binary', any>} core - * @param {import('protomux')} protomux - */ -export function unreplicate(core, protomux) { - const peerToUnreplicate = core.peers.find( - (peer) => peer.protomux === protomux - ) - if (!peerToUnreplicate) return - peerToUnreplicate.channel.close() - return -} - /** * From https://github.com/holepunchto/corestore/blob/v6.8.4/index.js#L240 * diff --git a/types/protomux.d.ts b/types/protomux.d.ts index d1c4d5657..5930dcd70 100644 --- a/types/protomux.d.ts +++ b/types/protomux.d.ts @@ -46,6 +46,7 @@ declare module 'protomux' { class Protomux { constructor(stream: TStream) + [Symbol.iterator](): IterableIterator isProtomux: true stream: TStream static from(stream: TStream): Protomux