From aa30f7504029cd975cb44cda397227aff906a8af Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Tue, 28 Nov 2023 18:01:31 +0900 Subject: [PATCH] feat: integrate sync and project invites (#362) * WIP initial work * rename Rpc to LocalPeers * Handle deviceInfo internally, id -> deviceId * Tests for stream error handling * remove unnecessary constructor * return replication stream * Attach protomux instance to peer info * rename and re-organize * revert changes outside scope of PR * WIP initial work * Tie everything together * rename getProjectInstance * feat: client.listLocalPeers() & `local-peers` evt * feat: add $sync API methods For now this simplifies the API (because we are only supporting local sync, not remote sync over the internet) to: - `project.$sync.getState()` - `project.$sync.start()` - `project.$sync.stop()` - Events - `sync-state` It's currently not possible to stop local discovery, nor is it possible to stop sync of the metadata namespaces (auth, config, blobIndex). The start and stop methods stop the sync of the data and blob namespaces. Fixes #134. Stacked on #360, #358 and #356. * feat: Add project.$waitForInitialSync() method Fixes Add project method to download auth + config cores #233 Rather than call this inside the `client.addProject()` method, instead I think it is better for the API consumer to call `project.$waitForInitialSync()` after adding a project, since this allows the implementer to give user feedback about what is happening. * Wait for initial sync within addProject() * fix: don't add core bitfield until core is ready * feat: expose deviceId on coreManager * fix: wait for project.ready() in waitForInitialSync * fix: skip waitForSync in tests * don't enable/disable namespace if not needed * start core download when created via sparse: false * Add debug logging This was a big lift, but necessary to be able to debug sync issues since temporarily adding console.log statements was too much work, and debugging requires knowing the deviceId associated with each message. * fix timeout * fix: Add new cores to the indexer (!!!) This caused a day of work: a bug from months back * remove unnecessary log stmt * get capabilities.getMany() to include creator * fix invite test * keep blob cores sparse * optional param for LocalPeers * re-org sync and replication Removes old replication code attached to CoreManager Still needs tests to be updated * update package-lock * chore: Add debug logging * Add new logger to discovery + dnssd * Get invite test working * fix manager logger * cleanup invite test (and make it fail :( * fix: handle duplicate connections to LocalPeers * fix stream close before channel open * send invite to non-existent peer * fixed fake timers implementation for tests * new tests for duplicate connections * cleanup and small fix * Better state debug logging * chain of invites test * fix max listeners and add skipped test * fix: only request a core key from one peer Reduces the number of duplicate requests for the same keys. * cleanup members tests with new helprs * wait for project ready when adding * only create 4 clients for chain of invites test * add e2e sync tests * add published @mapeo/mock-data * fix: don't open cores in sparse mode Turns out this changes how core.length etc. work, which confuses things * fix: option to skip auto download for tests * e2e test for stop-start sync * fix coreManager unit tests * fix blob store tests * fix discovery-key event * add coreCount to sync state * test sync with blocked peer & fix bugs * fix datatype unit tests * fix blobs server unit tests * remote peer-sync-controller unit test This is now tested in e2e tests * fix type issues caused by bad lockfile * ignore debug type errors * fixes for review comments * move utils-new into utils * Add debug info to test that sometimes fails * Update package-lock.json version --------- Co-authored-by: Andrew Chou --- package-lock.json | 148 +++++++- package.json | 2 + src/capabilities.js | 10 +- src/core-manager/index.js | 321 +++++++++--------- src/core-manager/replication-state-machine.js | 76 ----- src/datastore/index.js | 16 +- src/local-peers.js | 4 +- src/logger.js | 34 +- src/mapeo-manager.js | 173 ++++++++-- src/mapeo-project.js | 54 +-- src/sync/core-sync-state.js | 11 +- src/sync/namespace-sync-state.js | 9 +- src/sync/peer-sync-controller.js | 79 +++-- src/sync/sync-api.js | 128 ++++++- src/utils.js | 2 +- test-e2e/capabilities.js | 22 +- test-e2e/device-info.js | 11 +- test-e2e/manager-basic.js | 52 +-- test-e2e/manager-invite.js | 280 ++++++++------- test-e2e/members.js | 181 ++++------ test-e2e/project-crud.js | 19 +- test-e2e/sync.js | 256 ++++++++++++++ test-e2e/utils.js | 302 ++++++++++++++++ tests/blob-store/blob-store.js | 30 +- tests/core-manager.js | 235 ++----------- tests/data-type.js | 16 +- tests/fastify-plugins/blobs.js | 11 +- tests/helpers/blob-store.js | 21 -- tests/helpers/core-manager.js | 12 +- tests/local-peers.js | 2 +- tests/sync/peer-sync-controller.js | 188 ---------- types/corestore.d.ts | 1 + 32 files changed, 1618 insertions(+), 1088 deletions(-) delete mode 100644 src/core-manager/replication-state-machine.js create mode 100644 test-e2e/sync.js delete mode 100644 tests/sync/peer-sync-controller.js diff --git a/package-lock.json b/package-lock.json index e326a3d9d..34a791f94 100644 --- a/package-lock.json +++ b/package-lock.json @@ -53,6 +53,7 @@ "devDependencies": { "@bufbuild/buf": "^1.26.1", "@hyperswarm/testnet": "^3.1.2", + "@mapeo/mock-data": "^1.0.1", "@sinonjs/fake-timers": "^10.0.2", "@types/b4a": "^1.6.0", "@types/debug": "^4.1.8", @@ -77,6 +78,7 @@ "prettier": "^2.8.8", "random-access-file": "^4.0.4", "random-access-memory": "^6.2.0", + "random-bytes-readable-stream": "^3.0.0", "rimraf": "^5.0.1", "streamx": "^2.15.1", "tempy": "^3.1.0", @@ -377,6 +379,22 @@ "node": "^12.22.0 || ^14.17.0 || >=16.0.0" } }, + "node_modules/@faker-js/faker": { + "version": "8.3.1", + "resolved": "https://registry.npmjs.org/@faker-js/faker/-/faker-8.3.1.tgz", + "integrity": "sha512-FdgpFxY6V6rLZE9mmIBb9hM0xpfvQOSNOLnzolzKwsE1DH+gC7lEKV1p1IbR0lAYyvYd5a4u3qWJzowUkw1bIw==", + "dev": true, + "funding": [ + { + "type": "opencollective", + "url": "https://opencollective.com/fakerjs" + } + ], + "engines": { + "node": "^14.17.0 || ^16.13.0 || >=18.0.0", + "npm": ">=6.14.13" + } + }, "node_modules/@fastify/ajv-compiler": { "version": "3.5.0", "license": "MIT", @@ -651,6 +669,22 @@ "z32": "^1.0.0" } }, + "node_modules/@mapeo/mock-data": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/@mapeo/mock-data/-/mock-data-1.0.1.tgz", + "integrity": "sha512-Mdmyn3dCjF3wuwuGJhjGjVnydwHoZPgpAyA8JJvZDBNr1qUbeQyq6mrBr+Tg90ZBRY7G9S0wxABD+OJOKZhQdg==", + "dev": true, + "dependencies": { + "@faker-js/faker": "^8.3.1", + "@mapeo/schema": "^3.0.0-next.11", + "json-schema-faker": "^0.5.3", + "type-fest": "^4.8.0" + }, + "bin": { + "generate-mapeo-data": "bin/generate-mapeo-data.js", + "list-mapeo-schemas": "bin/list-mapeo-schemas.js" + } + }, "node_modules/@mapeo/schema": { "version": "3.0.0-next.13", "resolved": "https://registry.npmjs.org/@mapeo/schema/-/schema-3.0.0-next.13.tgz", @@ -1431,6 +1465,12 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/call-me-maybe": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/call-me-maybe/-/call-me-maybe-1.0.2.tgz", + "integrity": "sha512-HpX65o1Hnr9HH25ojC1YGs7HCQLq0GCOibSaWER0eNpgJ/Z1MZv2mTc7+xh6WOPxbRVcmgbv4hGU+uSQ/2xFZQ==", + "dev": true + }, "node_modules/callsites": { "version": "3.1.0", "dev": true, @@ -2565,6 +2605,19 @@ "url": "https://opencollective.com/eslint" } }, + "node_modules/esprima": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/esprima/-/esprima-4.0.1.tgz", + "integrity": "sha512-eGuFFw7Upda+g4p+QHvnW0RyTX/SVeJBDM/gCtMARO0cLuT2HcEKnTPvhjV6aGeqrCB/sbNop0Kszm0jsaWU4A==", + "dev": true, + "bin": { + "esparse": "bin/esparse.js", + "esvalidate": "bin/esvalidate.js" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/esquery": { "version": "1.5.0", "dev": true, @@ -2967,6 +3020,12 @@ "node": ">=8.0.0" } }, + "node_modules/format-util": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/format-util/-/format-util-1.0.5.tgz", + "integrity": "sha512-varLbTj0e0yVyRpqQhuWV+8hlePAgaoFRhNFj50BNjEIrw1/DphHSObtqwskVCPWNgzwPoQrZAbfa/SBiicNeg==", + "dev": true + }, "node_modules/forwarded": { "version": "0.2.0", "license": "MIT", @@ -3987,6 +4046,53 @@ "dev": true, "license": "MIT" }, + "node_modules/json-schema-faker": { + "version": "0.5.3", + "resolved": "https://registry.npmjs.org/json-schema-faker/-/json-schema-faker-0.5.3.tgz", + "integrity": "sha512-BeIrR0+YSrTbAR9dOMnjbFl1MvHyXnq+Wpdw1FpWZDHWKLzK229hZ5huyPcmzFUfVq1ODwf40WdGVoE266UBUg==", + "dev": true, + "dependencies": { + "json-schema-ref-parser": "^6.1.0", + "jsonpath-plus": "^7.2.0" + }, + "bin": { + "jsf": "bin/gen.cjs" + } + }, + "node_modules/json-schema-ref-parser": { + "version": "6.1.0", + "resolved": "https://registry.npmjs.org/json-schema-ref-parser/-/json-schema-ref-parser-6.1.0.tgz", + "integrity": "sha512-pXe9H1m6IgIpXmE5JSb8epilNTGsmTb2iPohAXpOdhqGFbQjNeHHsZxU+C8w6T81GZxSPFLeUoqDJmzxx5IGuw==", + "deprecated": "Please switch to @apidevtools/json-schema-ref-parser", + "dev": true, + "dependencies": { + "call-me-maybe": "^1.0.1", + "js-yaml": "^3.12.1", + "ono": "^4.0.11" + } + }, + "node_modules/json-schema-ref-parser/node_modules/argparse": { + "version": "1.0.10", + "resolved": "https://registry.npmjs.org/argparse/-/argparse-1.0.10.tgz", + "integrity": "sha512-o5Roy6tNG4SL/FOkCAN6RzjiakZS25RLYFrcMttJqbdd8BWrnA+fGz57iN5Pb06pvBGvl5gQ0B48dJlslXvoTg==", + "dev": true, + "dependencies": { + "sprintf-js": "~1.0.2" + } + }, + "node_modules/json-schema-ref-parser/node_modules/js-yaml": { + "version": "3.14.1", + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.14.1.tgz", + "integrity": "sha512-okMH7OXXJ7YrN9Ok3/SXrnu4iX9yOk+25nqX4imS2npuvTYDmo/QEZoqwZkYaIDk3jVvBOTOIEgEhaLOynBS9g==", + "dev": true, + "dependencies": { + "argparse": "^1.0.7", + "esprima": "^4.0.0" + }, + "bin": { + "js-yaml": "bin/js-yaml.js" + } + }, "node_modules/json-schema-traverse": { "version": "0.4.1", "dev": true, @@ -4031,6 +4137,15 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/jsonpath-plus": { + "version": "7.2.0", + "resolved": "https://registry.npmjs.org/jsonpath-plus/-/jsonpath-plus-7.2.0.tgz", + "integrity": "sha512-zBfiUPM5nD0YZSBT/o/fbCUlCcepMIdP0CJZxM1+KgA4f2T206f6VAg9e7mX35+KlMaIc5qXW34f3BnwJ3w+RA==", + "dev": true, + "engines": { + "node": ">=12.0.0" + } + }, "node_modules/junk": { "version": "4.0.1", "dev": true, @@ -5157,6 +5272,15 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/ono": { + "version": "4.0.11", + "resolved": "https://registry.npmjs.org/ono/-/ono-4.0.11.tgz", + "integrity": "sha512-jQ31cORBFE6td25deYeD80wxKBMj+zBmHTrVxnc6CKhx8gho6ipmWM5zj/oeoqioZ99yqBls9Z/9Nss7J26G2g==", + "dev": true, + "dependencies": { + "format-util": "^1.0.3" + } + }, "node_modules/open": { "version": "7.4.2", "resolved": "https://registry.npmjs.org/open/-/open-7.4.2.tgz", @@ -5824,6 +5948,18 @@ "version": "1.0.0", "license": "MIT" }, + "node_modules/random-bytes-readable-stream": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/random-bytes-readable-stream/-/random-bytes-readable-stream-3.0.0.tgz", + "integrity": "sha512-GrDPlkikCTvAOClNgxbbZg2Xq84lmBqhCkkPuDh92vgsDfW9dvrp4kQpE7AOL/3B3j3BATkQocpnCp/bUBn9NQ==", + "dev": true, + "engines": { + "node": "^12.20.0 || ^14.13.1 || >=16.0.0" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/random-bytes-seed": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/random-bytes-seed/-/random-bytes-seed-1.0.3.tgz", @@ -6729,6 +6865,12 @@ "node": ">= 10.x" } }, + "node_modules/sprintf-js": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.0.3.tgz", + "integrity": "sha512-D9cPgkvLlV3t3IzL0D0YLvGA9Ahk4PcvVwUbN0dSGr1aP0Nrt4AEnTUbuGvquEC0mA64Gqt1fzirlRs5ibXx8g==", + "dev": true + }, "node_modules/stackframe": { "version": "1.3.4", "dev": true, @@ -7201,9 +7343,9 @@ } }, "node_modules/type-fest": { - "version": "4.5.0", - "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-4.5.0.tgz", - "integrity": "sha512-diLQivFzddJl4ylL3jxSkEc39Tpw7o1QeEHIPxVwryDK2lpB7Nqhzhuo6v5/Ls08Z0yPSAhsyAWlv1/H0ciNmw==", + "version": "4.8.1", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-4.8.1.tgz", + "integrity": "sha512-ShaaYnjf+0etG8W/FumARKMjjIToy/haCaTjN2dvcewOSoNqCQzdgG7m2JVOlM5qndGTHjkvsrWZs+k/2Z7E0Q==", "engines": { "node": ">=16" }, diff --git a/package.json b/package.json index c642be21f..5374bdd0a 100644 --- a/package.json +++ b/package.json @@ -68,6 +68,7 @@ "devDependencies": { "@bufbuild/buf": "^1.26.1", "@hyperswarm/testnet": "^3.1.2", + "@mapeo/mock-data": "^1.0.1", "@sinonjs/fake-timers": "^10.0.2", "@types/b4a": "^1.6.0", "@types/debug": "^4.1.8", @@ -92,6 +93,7 @@ "prettier": "^2.8.8", "random-access-file": "^4.0.4", "random-access-memory": "^6.2.0", + "random-bytes-readable-stream": "^3.0.0", "rimraf": "^5.0.1", "streamx": "^2.15.1", "tempy": "^3.1.0", diff --git a/src/capabilities.js b/src/capabilities.js index 05d8b4485..8b683c95c 100644 --- a/src/capabilities.js +++ b/src/capabilities.js @@ -147,6 +147,8 @@ export class Capabilities { #projectCreatorAuthCoreId #ownDeviceId + static NO_ROLE_CAPABILITIES = NO_ROLE_CAPABILITIES + /** * * @param {object} opts @@ -209,17 +211,21 @@ export class Capabilities { */ async getAll() { const roles = await this.#dataType.getMany() + /** @type {Record} */ + const capabilities = {} let projectCreatorDeviceId try { projectCreatorDeviceId = await this.#coreOwnership.getOwner( this.#projectCreatorAuthCoreId ) + // Default to creator capabilities, but can be overwritten if a different + // role is set below + capabilities[projectCreatorDeviceId] = CREATOR_CAPABILITIES } catch (e) { // Not found, we don't know who the project creator is so we can't include // them in the returned map } - /** @type {Record} */ - const capabilities = {} + for (const role of roles) { const deviceId = role.docId if (!isKnownRoleId(role.roleId)) continue diff --git a/src/core-manager/index.js b/src/core-manager/index.js index 3f085738d..9168449a1 100644 --- a/src/core-manager/index.js +++ b/src/core-manager/index.js @@ -2,14 +2,15 @@ import { TypedEmitter } from 'tiny-typed-emitter' import Corestore from 'corestore' import assert from 'node:assert' -import { once } from 'node:events' -import Hypercore from 'hypercore' import { HaveExtension, ProjectExtension } from '../generated/extensions.js' import { CoreIndex } from './core-index.js' -import { ReplicationStateMachine } from './replication-state-machine.js' import * as rle from './bitfield-rle.js' import { Logger } from '../logger.js' +import { keyToId } from '../utils.js' +import { discoveryKey } from 'hypercore-crypto' +import Hypercore from 'hypercore' +export const kCoreManagerReplicate = Symbol('replicate core manager') // WARNING: Changing these will break things for existing apps, since namespaces // are used for key derivation export const NAMESPACES = /** @type {const} */ ([ @@ -30,7 +31,6 @@ const CREATE_SQL = `CREATE TABLE IF NOT EXISTS ${TABLE} ( /** @typedef {(typeof NAMESPACES)[number]} Namespace */ /** @typedef {{ core: Core, key: Buffer, namespace: Namespace }} CoreRecord */ /** @typedef {import('streamx').Duplex} DuplexStream */ -/** @typedef {{ rsm: ReplicationStateMachine, stream: DuplexStream, cores: Set }} ReplicationRecord */ /** * @typedef {Object} Events * @property {(coreRecord: CoreRecord) => void} add-core @@ -48,8 +48,6 @@ export class CoreManager extends TypedEmitter { #projectKey #addCoreSqlStmt #encryptionKeys - /** @type {Set} */ - #replications = new Set() #projectExtension /** @type {'opened' | 'closing' | 'closed'} */ #state = 'opened' @@ -57,6 +55,13 @@ export class CoreManager extends TypedEmitter { #haveExtension #deviceId #l + /** + * We use this to reduce network traffic caused by requesting the same key + * from multiple clients. + * TODO: Remove items from this set after a max age + */ + #keyRequests = new TrackedKeyRequests() + #autoDownload static get namespaces() { return NAMESPACES @@ -70,6 +75,7 @@ export class CoreManager extends TypedEmitter { * @param {Buffer} [options.projectSecretKey] 32-byte secret key of the project creator core * @param {Partial>} [options.encryptionKeys] Encryption keys for each namespace * @param {import('hypercore').HypercoreStorage} options.storage Folder to store all hypercore data + * @param {boolean} [options.autoDownload=true] Immediately start downloading cores - should only be set to false for tests * @param {Logger} [options.logger] */ constructor({ @@ -79,6 +85,7 @@ export class CoreManager extends TypedEmitter { projectSecretKey, encryptionKeys = {}, storage, + autoDownload = true, logger, }) { super() @@ -90,11 +97,14 @@ export class CoreManager extends TypedEmitter { !projectSecretKey || projectSecretKey.length === 64, 'project owner core secret key must be 64-byte buffer' ) + // Each peer will attach a listener, so max listeners is max attached peers + this.setMaxListeners(0) this.#l = Logger.create('coreManager', logger) const primaryKey = keyManager.getDerivedKey('primaryKey', projectKey) this.#deviceId = keyManager.getIdentityKeypair().publicKey.toString('hex') this.#projectKey = projectKey this.#encryptionKeys = encryptionKeys + this.#autoDownload = autoDownload // Make sure table exists for persisting known cores sqlite.prepare(CREATE_SQL).run() @@ -161,6 +171,12 @@ export class CoreManager extends TypedEmitter { this.#creatorCore.on('peer-add', (peer) => { this.#sendHaves(peer) }) + this.#creatorCore.on('peer-remove', (peer) => { + // When a peer is removed we clean up any unanswered key requests, so that + // we will request from a different peer, and to avoid the tracking of key + // requests growing without bounds. + this.#keyRequests.deleteByPeerKey(peer.remotePublicKey) + }) this.#ready = Promise.all( [...this.#coreIndex].map(({ core }) => core.ready()) @@ -222,13 +238,13 @@ export class CoreManager extends TypedEmitter { * Get a core by its discovery key * * @param {Buffer} discoveryKey - * @returns {Core | undefined} + * @returns {CoreRecord | undefined} */ getCoreByDiscoveryKey(discoveryKey) { const coreRecord = this.#coreIndex.getByDiscoveryId( discoveryKey.toString('hex') ) - return coreRecord && coreRecord.core + return coreRecord } /** @@ -237,14 +253,11 @@ export class CoreManager extends TypedEmitter { */ async close() { this.#state = 'closing' + this.#keyRequests.clear() const promises = [] for (const { core } of this.#coreIndex) { promises.push(core.close()) } - for (const { stream } of this.#replications) { - promises.push(once(stream, 'close')) - stream.destroy() - } await Promise.all(promises) this.#state = 'closed' } @@ -279,6 +292,11 @@ export class CoreManager extends TypedEmitter { keyPair, encryptionKey: this.#encryptionKeys[namespace], }) + if (namespace !== 'blob' && this.#autoDownload) { + core.download({ start: 0, end: -1 }) + } + // Every peer adds a listener, so could have many peers + core.setMaxListeners(0) // @ts-ignore - ensure key is defined before hypercore is ready core.key = key this.#coreIndex.add({ core, key, namespace, writer }) @@ -308,13 +326,6 @@ export class CoreManager extends TypedEmitter { }) } - for (const { stream, rsm, cores } of this.#replications.values()) { - if (cores.has(core)) continue - if (rsm.state.enabledNamespaces.has(namespace)) { - core.replicate(stream) - } - } - if (persist) { this.#addCoreSqlStmt.run({ publicKey: key, namespace }) } @@ -331,140 +342,46 @@ export class CoreManager extends TypedEmitter { } /** - * Start replicating cores managed by CoreManager to a Noise Secret Stream (as - * created by @hyperswarm/secret-stream). Important: only one CoreManager can - * be replicated to a given stream - attempting to replicate a second - * CoreManager to the same stream will cause sharing of auth core keys to - * fail - see https://github.com/holepunchto/corestore/issues/45 - * - * Initially only cores in the `auth` namespace are replicated to the stream. - * All cores in the `auth` namespace are shared to all peers who have the - * `rootCoreKey` core, and also replicated to the stream + * Send an extension message over the project creator core replication stream + * requesting a core key for the given discovery key. * - * To start replicating other namespaces call `enableNamespace(ns)` on the - * returned state machine - * - * @param {import('../types.js').NoiseStream | import('../types.js').ProtocolStream} noiseStream framed noise secret stream, i.e. @hyperswarm/secret-stream - */ - replicate(noiseStream) { - if (this.#state !== 'opened') throw new Error('Core manager is closed') - if ( - /** @type {import('../types.js').ProtocolStream} */ (noiseStream) - .noiseStream?.userData - ) { - console.warn( - 'Passed an existing protocol stream to coreManager.replicate(). Other corestores and core managers replicated to this stream will no longer automatically inject shared cores into the stream' - ) - } - // @ts-expect-error - too complex to type right now - const stream = Hypercore.createProtocolStream(noiseStream) - const protocol = stream.noiseStream.userData - if (!protocol) throw new Error('Invalid stream') - // If the noise stream already had a protomux instance attached to - // noiseStream.userData, then Hypercore.createProtocolStream does not attach - // the ondiscoverykey listener, so we make sure we are listening for this, - // and that we override any previous notifier that was attached to protomux. - // This means that only one Core Manager instance can currently be - // replicated to a stream if we want sharing of unknown auth cores to work. - protocol.pair( - { protocol: 'hypercore/alpha' }, - /** @param {Buffer} discoveryKey */ async (discoveryKey) => { - this.handleDiscoveryKey(discoveryKey, stream) - } - ) - - /** @type {ReplicationRecord['cores']} */ - const replicatingCores = new Set() - const rsm = new ReplicationStateMachine({ - enableNamespace: (namespace) => { - for (const { core } of this.getCores(namespace)) { - if (replicatingCores.has(core)) continue - core.replicate(protocol) - replicatingCores.add(core) - } - }, - disableNamespace: (namespace) => { - for (const { core } of this.getCores(namespace)) { - if (core === this.creatorCore) continue - unreplicate(core, protocol) - replicatingCores.delete(core) - } - }, - }) - - // Always need to replicate the project creator core - this.creatorCore.replicate(protocol) - replicatingCores.add(this.creatorCore) - - // For now enable auth namespace here, rather than in sync controller - rsm.enableNamespace('auth') - - /** @type {ReplicationRecord} */ - const replicationRecord = { stream, rsm, cores: replicatingCores } - this.#replications.add(replicationRecord) - - stream.once('close', () => { - rsm.disableAll() - rsm.removeAllListeners() - this.#replications.delete(replicationRecord) - }) - - return rsm - } - - /** + * @param {Buffer} peerKey * @param {Buffer} discoveryKey - * @param {any} stream */ - async handleDiscoveryKey(discoveryKey, stream) { - const discoveryId = discoveryKey.toString('hex') - const peer = await this.#findPeer(stream.remotePublicKey) + requestCoreKey(peerKey, discoveryKey) { + // No-op if we already have this core + if (this.getCoreByDiscoveryKey(discoveryKey)) return + const peer = this.#creatorCore.peers.find((peer) => { + return peer.remotePublicKey.equals(peerKey) + }) if (!peer) { - console.warn('handleDiscovery no peer', stream.remotePublicKey) - // TODO: How to handle this and when does it happen? + // This should not happen because this is only called from SyncApi, which + // checks the peer exists before calling this method. + this.#l.log( + 'Attempted to request core key for %h, but no connected peer %h', + discoveryKey, + peerKey + ) return } - // If we already know about this core, then we will add it to the - // replication stream when we are ready - if (this.#coreIndex.getByDiscoveryId(discoveryId)) return + // Only request a key once, e.g. from the peer we first receive it from (we + // can assume that a peer must have the key if we see the discovery key in + // the protomux). This is necessary to reduce network traffic for many newly + // connected peers - otherwise duplicate requests will be sent to every peer + if (this.#keyRequests.has(discoveryKey)) return + this.#keyRequests.set(discoveryKey, peerKey) + + this.#l.log( + 'Requesting core key for discovery key %h from peer %h', + discoveryKey, + peerKey + ) const message = ProjectExtension.fromPartial({ wantCoreKeys: [discoveryKey], }) this.#projectExtension.send(message, peer) } - /** - * @param {Buffer} publicKey - * @param {{ timeout?: number }} [opts] - */ - async #findPeer(publicKey, { timeout = 200 } = {}) { - const creatorCore = this.#creatorCore - const peer = creatorCore.peers.find((peer) => { - return peer.remotePublicKey.equals(publicKey) - }) - if (peer) return peer - // This is called from the from the handleDiscoveryId event, which can - // happen before the peer connection is fully established, so we wait for - // the `peer-add` event, with a timeout in case the peer never gets added - return new Promise(function (res) { - const timeoutId = setTimeout(function () { - creatorCore.off('peer-add', onPeer) - res(null) - }, timeout) - - creatorCore.on('peer-add', onPeer) - - /** @param {any} peer */ - function onPeer(peer) { - if (peer.remotePublicKey.equals(publicKey)) { - clearTimeout(timeoutId) - creatorCore.off('peer-add', onPeer) - res(peer) - } - } - }) - } - /** * @param {ProjectExtension} msg * @param {any} peer @@ -473,8 +390,7 @@ export class CoreManager extends TypedEmitter { const message = ProjectExtension.create() let hasKeys = false for (const discoveryKey of wantCoreKeys) { - const discoveryId = discoveryKey.toString('hex') - const coreRecord = this.#coreIndex.getByDiscoveryId(discoveryId) + const coreRecord = this.getCoreByDiscoveryKey(discoveryKey) if (!coreRecord) continue message[`${coreRecord.namespace}CoreKeys`].push(coreRecord.key) hasKeys = true @@ -486,6 +402,7 @@ export class CoreManager extends TypedEmitter { for (const coreKey of coreKeys[`${namespace}CoreKeys`]) { // Use public method - these must be persisted (private method defaults to persisted=false) this.addCore(coreKey, namespace) + this.#keyRequests.deleteByDiscoveryKey(discoveryKey(coreKey)) } } } @@ -539,6 +456,28 @@ export class CoreManager extends TypedEmitter { peer.protomux.uncork() } + + /** + * ONLY FOR TESTING + * Replicate all cores in core manager + * + * @param {Parameters[0]} stream + * @returns + */ + [kCoreManagerReplicate](stream) { + const protocolStream = Hypercore.createProtocolStream(stream, { + ondiscoverykey: async (discoveryKey) => { + const peer = await findPeer( + this.creatorCore, + // @ts-ignore + protocolStream.noiseStream.remotePublicKey + ) + if (!peer) return + this.requestCoreKey(peer.remotePublicKey, discoveryKey) + }, + }) + return this.#corestore.replicate(stream) + } } /** @@ -585,16 +524,92 @@ const HaveExtensionCodec = { }, } +class TrackedKeyRequests { + /** @type {Map} */ + #byDiscoveryId = new Map() + /** @type {Map>} */ + #byPeerId = new Map() + + /** + * @param {Buffer} discoveryKey + * @param {Buffer} peerKey + */ + set(discoveryKey, peerKey) { + const discoveryId = keyToId(discoveryKey) + const peerId = keyToId(peerKey) + const existingForPeer = this.#byPeerId.get(peerId) || new Set() + this.#byDiscoveryId.set(discoveryId, peerId) + existingForPeer.add(discoveryId) + this.#byPeerId.set(peerId, existingForPeer) + return this + } + /** + * @param {Buffer} discoveryKey + */ + has(discoveryKey) { + const discoveryId = keyToId(discoveryKey) + return this.#byDiscoveryId.has(discoveryId) + } + /** + * @param {Buffer} discoveryKey + */ + deleteByDiscoveryKey(discoveryKey) { + const discoveryId = keyToId(discoveryKey) + const peerId = this.#byDiscoveryId.get(discoveryId) + if (!peerId) return false + this.#byDiscoveryId.delete(discoveryId) + const existingForPeer = this.#byPeerId.get(peerId) + if (existingForPeer) { + existingForPeer.delete(discoveryId) + } + return true + } + /** + * @param {Buffer} peerKey + */ + deleteByPeerKey(peerKey) { + const peerId = keyToId(peerKey) + const existingForPeer = this.#byPeerId.get(peerId) + if (!existingForPeer) return + for (const discoveryId of existingForPeer) { + this.#byDiscoveryId.delete(discoveryId) + } + this.#byPeerId.delete(peerId) + } + clear() { + this.#byDiscoveryId.clear() + this.#byPeerId.clear() + } +} + /** - * - * @param {Hypercore<'binary', any>} core - * @param {import('protomux')} protomux + * @param {Hypercore<"binary", Buffer>} core + * @param {Buffer} publicKey + * @param {{ timeout?: number }} [opts] */ -export function unreplicate(core, protomux) { - const peerToUnreplicate = core.peers.find( - (peer) => peer.protomux === protomux - ) - if (!peerToUnreplicate) return - peerToUnreplicate.channel.close() - return +function findPeer(core, publicKey, { timeout = 200 } = {}) { + const peer = core.peers.find((peer) => { + return peer.remotePublicKey.equals(publicKey) + }) + if (peer) return peer + // This is called from the from the handleDiscoveryId event, which can + // happen before the peer connection is fully established, so we wait for + // the `peer-add` event, with a timeout in case the peer never gets added + return new Promise(function (res) { + const timeoutId = setTimeout(function () { + core.off('peer-add', onPeer) + res(null) + }, timeout) + + core.on('peer-add', onPeer) + + /** @param {any} peer */ + function onPeer(peer) { + if (peer.remotePublicKey.equals(publicKey)) { + clearTimeout(timeoutId) + core.off('peer-add', onPeer) + res(peer) + } + } + }) } diff --git a/src/core-manager/replication-state-machine.js b/src/core-manager/replication-state-machine.js deleted file mode 100644 index a71dfe61d..000000000 --- a/src/core-manager/replication-state-machine.js +++ /dev/null @@ -1,76 +0,0 @@ -import { TypedEmitter } from 'tiny-typed-emitter' - -/** @typedef {import('./index.js').Namespace} Namespace */ -/** @typedef {Set} EnabledNamespaces */ -/** @typedef {{ enabledNamespaces: EnabledNamespaces }} ReplicationState */ - -/** - * @typedef {object} StateMachineEvents - * @property {(state: ReplicationState) => void } state - */ - -/** - * A simple state machine to manage which namespaces are enabled for replication - * - * @extends {TypedEmitter} - */ -export class ReplicationStateMachine extends TypedEmitter { - /** @type {ReplicationState} */ - #state = { - enabledNamespaces: new Set(), - } - #enableNamespace - #disableNamespace - - /** - * - * @param {object} opts - * @param {(namespace: Namespace) => void} opts.enableNamespace - * @param {(namespace: Namespace) => void} opts.disableNamespace - */ - constructor({ enableNamespace, disableNamespace }) { - super() - this.#enableNamespace = enableNamespace - this.#disableNamespace = disableNamespace - } - - get state() { - return this.#state - } - - /** - * Enable a namespace for replication - will add known cores in the namespace - * to the replication stream - * - * @param {Namespace} namespace */ - enableNamespace(namespace) { - if (this.#state.enabledNamespaces.has(namespace)) return - this.#state.enabledNamespaces.add(namespace) - this.#enableNamespace(namespace) - this.emit('state', this.#state) - } - - /** - * Disable a namespace for replication - will remove cores in the namespace - * from the replication stream - * - * @param {Namespace} namespace - */ - disableNamespace(namespace) { - if (!this.#state.enabledNamespaces.has(namespace)) return - this.#state.enabledNamespaces.delete(namespace) - this.#disableNamespace(namespace) - this.emit('state', this.#state) - } - - /** - * @internal - * Should only be called when the stream is closed, because no obvious way to - * implement this otherwise. - */ - disableAll() { - if (!this.#state.enabledNamespaces.size) return - this.#state.enabledNamespaces.clear() - this.emit('state', this.#state) - } -} diff --git a/src/datastore/index.js b/src/datastore/index.js index 4655c0c06..be98ffcf1 100644 --- a/src/datastore/index.js +++ b/src/datastore/index.js @@ -66,6 +66,10 @@ export class DataStore extends TypedEmitter { storage, batch: (entries) => this.#handleEntries(entries), }) + coreManager.on('add-core', (coreRecord) => { + if (coreRecord.namespace !== namespace) return + this.#coreIndexer.addCore(coreRecord.core) + }) // Forward events from coreIndexer this.on('newListener', (eventName, listener) => { @@ -164,9 +168,9 @@ export class DataStore extends TypedEmitter { */ async read(versionId) { const { coreDiscoveryKey, index } = parseVersionId(versionId) - const core = this.#coreManager.getCoreByDiscoveryKey(coreDiscoveryKey) - if (!core) throw new Error('Invalid versionId') - const block = await core.get(index, { wait: false }) + const coreRecord = this.#coreManager.getCoreByDiscoveryKey(coreDiscoveryKey) + if (!coreRecord) throw new Error('Invalid versionId') + const block = await coreRecord.core.get(index, { wait: false }) if (!block) throw new Error('Not Found') return decode(block, { coreDiscoveryKey, index }) } @@ -186,9 +190,9 @@ export class DataStore extends TypedEmitter { /** @param {string} versionId */ async readRaw(versionId) { const { coreDiscoveryKey, index } = parseVersionId(versionId) - const core = this.#coreManager.getCoreByDiscoveryKey(coreDiscoveryKey) - if (!core) throw new Error('core not found') - const block = await core.get(index, { wait: false }) + const coreRecord = this.#coreManager.getCoreByDiscoveryKey(coreDiscoveryKey) + if (!coreRecord) throw new Error('core not found') + const block = await coreRecord.core.get(index, { wait: false }) if (!block) throw new Error('Not Found') return block } diff --git a/src/local-peers.js b/src/local-peers.js index aef72c678..fe3df44e6 100644 --- a/src/local-peers.js +++ b/src/local-peers.js @@ -207,7 +207,7 @@ class Peer { * @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status * @property {(peer: PeerInfoConnected) => void} peer-add Emitted when a new peer is connected * @property {(peerId: string, invite: InviteWithKeys) => void} invite Emitted when an invite is received - * @property {(discoveryKey: Buffer, stream: import('./types.js').ReplicationStream) => void} discovery-key Emitted when a new hypercore is replicated (by a peer) to a peer replication stream (passed as the second parameter) + * @property {(discoveryKey: Buffer, protomux: Protomux) => void} discovery-key Emitted when a new hypercore is replicated (by a peer) to a peer protomux instance (passed as the second parameter) */ /** @extends {TypedEmitter} */ @@ -347,7 +347,7 @@ export class LocalPeers extends TypedEmitter { discoveryKey, stream.noiseStream.remotePublicKey ) - this.emit('discovery-key', discoveryKey, outerStream) + this.emit('discovery-key', discoveryKey, protomux) } ) diff --git a/src/logger.js b/src/logger.js index 356e6cc35..80de7b9db 100644 --- a/src/logger.js +++ b/src/logger.js @@ -1,23 +1,51 @@ import createDebug from 'debug' import { discoveryKey } from 'hypercore-crypto' +import mapObject from 'map-obj' +import util from 'util' const TRIM = 7 -createDebug.formatters.h = (v) => { +createDebug.formatters.h = function (v) { if (!Buffer.isBuffer(v)) return '[undefined]' return v.toString('hex').slice(0, TRIM) } -createDebug.formatters.S = (v) => { +createDebug.formatters.S = function (v) { if (typeof v !== 'string') return '[undefined]' return v.slice(0, 7) } -createDebug.formatters.k = (v) => { +createDebug.formatters.k = function (v) { if (!Buffer.isBuffer(v)) return '[undefined]' return discoveryKey(v).toString('hex').slice(0, TRIM) } +/** + * @param {import('./sync/sync-state.js').State} v + * @this {any} */ +createDebug.formatters.X = function (v) { + try { + const mapped = mapObject(v, (k, v) => [ + k, + // @ts-ignore - type checks here don't get us anything + mapObject(v, (k, v) => { + if (k === 'remoteStates') + // @ts-ignore - type checks here don't get us anything + return [k, mapObject(v, (k, v) => [k.slice(0, 7), v])] + return [k, v] + }), + ]) + return util.inspect(mapped, { + colors: true, + depth: 10, + compact: 6, + breakLength: 90, + }) + } catch (e) { + return `[ERROR: $(e.message)]` + } +} + const counts = new Map() export class Logger { diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index d90e78d12..660ba2717 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -30,6 +30,8 @@ import { LocalPeers } from './local-peers.js' import { InviteApi } from './invite-api.js' import { MediaServer } from './media-server.js' import { LocalDiscovery } from './discovery/local-discovery.js' +import { Capabilities } from './capabilities.js' +import NoiseSecretStream from '@hyperswarm/secret-stream' import { Logger } from './logger.js' /** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */ @@ -74,6 +76,7 @@ export class MapeoManager extends TypedEmitter { #invite #mediaServer #localDiscovery + #loggerBase #l /** @@ -96,7 +99,8 @@ export class MapeoManager extends TypedEmitter { super() this.#keyManager = new KeyManager(rootKey) this.#deviceId = getDeviceId(this.#keyManager) - this.#l = new Logger({ deviceId: this.#deviceId, ns: 'manager' }) + const logger = (this.#loggerBase = new Logger({ deviceId: this.#deviceId })) + this.#l = Logger.create('manager', logger) this.#dbFolder = dbFolder this.#projectMigrationsFolder = projectMigrationsFolder const sqlite = new Database( @@ -107,15 +111,20 @@ export class MapeoManager extends TypedEmitter { this.#db = drizzle(sqlite) migrate(this.#db, { migrationsFolder: clientMigrationsFolder }) - this.#localPeers = new LocalPeers({ logger: this.#l }) + this.#localPeers = new LocalPeers({ logger }) this.#localPeers.on('peers', (peers) => { this.emit('local-peers', omitPeerProtomux(peers)) }) + this.#localPeers.on('discovery-key', (dk) => { + if (this.#activeProjects.size === 0) { + this.#l.log('Received dk %h but no active projects', dk) + } + }) this.#projectSettingsIndexWriter = new IndexWriter({ tables: [projectSettingsTable], sqlite, - logger: this.#l, + logger, }) this.#activeProjects = new Map() @@ -152,9 +161,9 @@ export class MapeoManager extends TypedEmitter { this.#localDiscovery = new LocalDiscovery({ identityKeypair: this.#keyManager.getIdentityKeypair(), - logger: this.#l, + logger, }) - this.#localDiscovery.on('connection', this[kManagerReplicate].bind(this)) + this.#localDiscovery.on('connection', this.#replicate.bind(this)) } /** @@ -169,15 +178,25 @@ export class MapeoManager extends TypedEmitter { } /** - * Replicate Mapeo to a `@hyperswarm/secret-stream`. This replication connects - * the Mapeo RPC channel and allows invites. All active projects will sync - * automatically to this replication stream. Only use for local (trusted) - * connections, because the RPC channel key is public. To sync a specific - * project without connecting RPC, use project[kProjectReplication]. + * Create a Mapeo replication stream. This replication connects the Mapeo RPC + * channel and allows invites. All active projects will sync automatically to + * this replication stream. Only use for local (trusted) connections, because + * the RPC channel key is public. To sync a specific project without + * connecting RPC, use project[kProjectReplication]. * - * @param {import('@hyperswarm/secret-stream')} noiseStream + * @param {boolean} isInitiator */ - [kManagerReplicate](noiseStream) { + [kManagerReplicate](isInitiator) { + const noiseStream = new NoiseSecretStream(isInitiator, undefined, { + keyPair: this.#keyManager.getIdentityKeypair(), + }) + return this.#replicate(noiseStream) + } + + /** + * @param {NoiseSecretStream} noiseStream + */ + #replicate(noiseStream) { const replicationStream = this.#localPeers.connect(noiseStream) Promise.all([this.getDeviceInfo(), openedNoiseSecretStream(noiseStream)]) .then(([{ name }, openedNoiseStream]) => { @@ -367,7 +386,7 @@ export class MapeoManager extends TypedEmitter { sharedDb: this.#db, sharedIndexWriter: this.#projectSettingsIndexWriter, localPeers: this.#localPeers, - logger: this.#l, + logger: this.#loggerBase, getMediaBaseUrl: this.#mediaServer.getMediaAddress.bind( this.#mediaServer ), @@ -426,10 +445,18 @@ export class MapeoManager extends TypedEmitter { } /** + * Add a project to this device. After adding a project the client should + * await `project.$waitForInitialSync()` to ensure that the device has + * downloaded their proof of project membership and the project config. + * * @param {import('./generated/rpc.js').Invite} invite + * @param {{ waitForSync?: boolean }} [opts] For internal use in tests, set opts.waitForSync = false to not wait for sync during addProject() * @returns {Promise} */ - async addProject({ projectKey, encryptionKeys, projectInfo }) { + async addProject( + { projectKey, encryptionKeys, projectInfo }, + { waitForSync = true } = {} + ) { const projectPublicId = projectKeyToPublicId(projectKey) // 1. Check for an active project @@ -453,10 +480,9 @@ export class MapeoManager extends TypedEmitter { throw new Error(`Project with ID ${projectPublicId} already exists`) } - // TODO: Relies on completion of https://github.com/digidem/mapeo-core-next/issues/233 - // 3. Sync auth + config cores + // No awaits here - need to update table in same tick as the projectExists check - // 4. Update the project keys table + // 3. Update the project keys table this.#saveToProjectKeysTable({ projectId, projectPublicId, @@ -467,17 +493,108 @@ export class MapeoManager extends TypedEmitter { projectInfo, }) - // 5. Write device info into project - const deviceInfo = await this.getDeviceInfo() - - if (deviceInfo.name) { + // Any errors from here we need to remove project from db because it has not + // been fully added and synced + try { + // 4. Write device info into project const project = await this.getProject(projectPublicId) - await project[kSetOwnDeviceInfo]({ name: deviceInfo.name }) + await project.ready() + + try { + const deviceInfo = await this.getDeviceInfo() + if (deviceInfo.name) { + await project[kSetOwnDeviceInfo]({ name: deviceInfo.name }) + } + } catch (e) { + // Can ignore an error trying to write device info + this.#l.log( + 'ERROR: failed to write project %h deviceInfo %o', + projectKey, + e + ) + } + + // 5. Wait for initial project sync + if (waitForSync) { + await this.#waitForInitialSync(project) + } + + this.#activeProjects.set(projectPublicId, project) + } catch (e) { + this.#l.log('ERROR: could not add project', e) + this.#db + .delete(projectKeysTable) + .where(eq(projectKeysTable.projectId, projectId)) + .run() + throw e } - + this.#l.log('Added project %h, public ID: %S', projectKey, projectPublicId) return projectPublicId } + /** + * Sync initial data: the `auth` cores which contain the capability messages, + * and the `config` cores which contain the project name & custom config (if + * it exists). The API consumer should await this after `client.addProject()` + * to ensure that the device is fully added to the project. + * + * @param {MapeoProject} project + * @param {object} [opts] + * @param {number} [opts.timeoutMs=5000] Timeout in milliseconds for max time + * to wait between sync status updates before giving up. As long as syncing is + * happening, this will never timeout, but if more than timeoutMs passes + * without any sync activity, then this will resolve `false` e.g. data has not + * synced + * @returns {Promise} + */ + async #waitForInitialSync(project, { timeoutMs = 5000 } = {}) { + await project.ready() + const [capability, projectSettings] = await Promise.all([ + project.$getOwnCapabilities(), + project.$getProjectSettings(), + ]) + const { + auth: { localState: authState }, + config: { localState: configState }, + } = project.$sync.getState() + const isCapabilitySynced = capability !== Capabilities.NO_ROLE_CAPABILITIES + const isProjectSettingsSynced = + projectSettings !== MapeoProject.EMPTY_PROJECT_SETTINGS + // Assumes every project that someone is invited to has at least one record + // in the auth store - the capability record for the invited device + const isAuthSynced = authState.want === 0 && authState.have > 0 + // Assumes every project that someone is invited to has at least one record + // in the config store - defining the name of the project. + // TODO: Enforce adding a project name in the invite method + const isConfigSynced = configState.want === 0 && configState.have > 0 + if ( + isCapabilitySynced && + isProjectSettingsSynced && + isAuthSynced && + isConfigSynced + ) { + return true + } + return new Promise((resolve, reject) => { + /** @param {import('./sync/sync-state.js').State} syncState */ + const onSyncState = (syncState) => { + clearTimeout(timeoutId) + if (syncState.auth.dataToSync || syncState.config.dataToSync) { + timeoutId = setTimeout(onTimeout, timeoutMs) + return + } + project.$sync.off('sync-state', onSyncState) + resolve(this.#waitForInitialSync(project, { timeoutMs })) + } + const onTimeout = () => { + project.$sync.off('sync-state', onSyncState) + reject(new Error('Sync timeout')) + } + let timeoutId = setTimeout(onTimeout, timeoutMs) + project.$sync.on('sync-state', onSyncState) + }) + } + /** * @template {import('type-fest').Exact} T * @param {T} deviceInfo @@ -501,6 +618,7 @@ export class MapeoManager extends TypedEmitter { await project[kSetOwnDeviceInfo](deviceInfo) }) ) + this.#l.log('set device info %o', deviceInfo) } /** @@ -533,6 +651,15 @@ export class MapeoManager extends TypedEmitter { await this.#mediaServer.stop() } + async startLocalPeerDiscovery() { + return this.#localDiscovery.start() + } + + /** @type {LocalDiscovery['stop']} */ + async stopLocalPeerDiscovery(opts) { + return this.#localDiscovery.stop(opts) + } + /** * @returns {Promise} */ diff --git a/src/mapeo-project.js b/src/mapeo-project.js index 5d5852254..e3ada1a75 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -36,20 +36,21 @@ import { valueOf, } from './utils.js' import { MemberApi } from './member-api.js' -import { IconApi } from './icon-api.js' -import { SyncApi, kSyncReplicate } from './sync/sync-api.js' -import Hypercore from 'hypercore' +import { SyncApi, kHandleDiscoveryKey } from './sync/sync-api.js' import { Logger } from './logger.js' +import { IconApi } from './icon-api.js' /** @typedef {Omit} EditableProjectSettings */ const CORESTORE_STORAGE_FOLDER_NAME = 'corestore' const INDEXER_STORAGE_FOLDER_NAME = 'indexer' +export const kCoreManager = Symbol('coreManager') export const kCoreOwnership = Symbol('coreOwnership') export const kCapabilities = Symbol('capabilities') export const kSetOwnDeviceInfo = Symbol('kSetOwnDeviceInfo') export const kBlobStore = Symbol('blobStore') export const kProjectReplicate = Symbol('replicate project') +const EMPTY_PROJECT_SETTINGS = Object.freeze({}) export class MapeoProject { #projectId @@ -66,6 +67,8 @@ export class MapeoProject { #syncApi #l + static EMPTY_PROJECT_SETTINGS = EMPTY_PROJECT_SETTINGS + /** * @param {Object} opts * @param {string} opts.dbPath Path to store project sqlite db. Use `:memory:` for memory storage @@ -278,26 +281,25 @@ export class MapeoProject { logger: this.#l, }) - ///////// 4. Wire up sync + ///////// 4. Replicate local peers automatically // Replicate already connected local peers for (const peer of localPeers.peers) { if (peer.status !== 'connected') continue - this.#syncApi[kSyncReplicate](peer.protomux) + this.#coreManager.creatorCore.replicate(peer.protomux) } - localPeers.on('discovery-key', (discoveryKey, stream) => { - // The core identified by this discovery key might not be part of this - // project, but we can't know that so we will request it from the peer if - // we don't have it. The peer will not return the core key unless it _is_ - // part of this project - this.#coreManager.handleDiscoveryKey(discoveryKey, stream) - }) - // When a new peer is found, try to replicate (if it is not a member of the // project it will fail the capability check and be ignored) localPeers.on('peer-add', (peer) => { - this.#syncApi[kSyncReplicate](peer.protomux) + this.#coreManager.creatorCore.replicate(peer.protomux) + }) + + // This happens whenever a peer replicates a core to the stream. SyncApi + // handles replicating this core if we also have it, or requesting the key + // for the core. + localPeers.on('discovery-key', (discoveryKey, stream) => { + this.#syncApi[kHandleDiscoveryKey](discoveryKey, stream) }) ///////// 5. Write core ownership record @@ -324,6 +326,13 @@ export class MapeoProject { this.#l.log('Created project instance %h', projectKey) } + /** + * CoreManager instance, used for tests + */ + get [kCoreManager]() { + return this.#coreManager + } + /** * CoreOwnership instance, used for tests */ @@ -447,7 +456,7 @@ export class MapeoProject { ) } catch (e) { this.#l.log('No project settings') - return /** @type {EditableProjectSettings} */ ({}) + return /** @type {EditableProjectSettings} */ (EMPTY_PROJECT_SETTINGS) } } @@ -461,18 +470,21 @@ export class MapeoProject { * and only this project will replicate (to replicate multiple projects you * need to replicate the manager instance via manager[kManagerReplicate]) * - * @param {Exclude[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance + * @param {Parameters[0]} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance * @returns */ [kProjectReplicate](stream) { - const replicationStream = Hypercore.createProtocolStream(stream, { + // @ts-expect-error - hypercore types need updating + const replicationStream = this.#coreManager.creatorCore.replicate(stream, { + // @ts-ignore - hypercore types do not currently include this option ondiscoverykey: async (discoveryKey) => { - this.#coreManager.handleDiscoveryKey(discoveryKey, replicationStream) + const protomux = + /** @type {import('protomux')} */ ( + replicationStream.noiseStream.userData + ) + this.#syncApi[kHandleDiscoveryKey](discoveryKey, protomux) }, }) - const protomux = replicationStream.noiseStream.userData - // @ts-ignore - got fed up jumping through hoops to keep TS heppy - this.#syncApi[kSyncReplicate](protomux) return replicationStream } diff --git a/src/sync/core-sync-state.js b/src/sync/core-sync-state.js index ec382656f..6872dd53c 100644 --- a/src/sync/core-sync-state.js +++ b/src/sync/core-sync-state.js @@ -93,10 +93,13 @@ export class CoreSyncState { if (this.#core) return this.#core = core - this.#localState.setHavesBitfield( - // @ts-ignore - internal property - core?.core?.bitfield - ) + + this.#core.ready().then(() => { + this.#localState.setHavesBitfield( + // @ts-ignore - internal property + core?.core?.bitfield + ) + }) for (const peer of this.#core.peers) { this.#onPeerAdd(peer) diff --git a/src/sync/namespace-sync-state.js b/src/sync/namespace-sync-state.js index 3ffd6769f..ff2dec969 100644 --- a/src/sync/namespace-sync-state.js +++ b/src/sync/namespace-sync-state.js @@ -2,7 +2,7 @@ import { CoreSyncState } from './core-sync-state.js' import { discoveryKey } from 'hypercore-crypto' /** - * @typedef {Omit} SyncState + * @typedef {Omit & { dataToSync: boolean, coreCount: number }} SyncState */ /** @@ -11,6 +11,7 @@ import { discoveryKey } from 'hypercore-crypto' export class NamespaceSyncState { /** @type {Map} */ #coreStates = new Map() + #coreCount = 0 #handleUpdate #namespace /** @type {SyncState | null} */ @@ -55,6 +56,8 @@ export class NamespaceSyncState { if (this.#cachedState) return this.#cachedState /** @type {SyncState} */ const state = { + dataToSync: false, + coreCount: this.#coreCount, localState: createState(), remoteStates: {}, } @@ -71,6 +74,9 @@ export class NamespaceSyncState { } } } + if (state.localState.want > 0 || state.localState.wanted > 0) { + state.dataToSync = true + } this.#cachedState = state return state } @@ -82,6 +88,7 @@ export class NamespaceSyncState { #addCore(core, coreKey) { const discoveryId = discoveryKey(coreKey).toString('hex') this.#getCoreState(discoveryId).attachCore(core) + this.#coreCount++ } /** diff --git a/src/sync/peer-sync-controller.js b/src/sync/peer-sync-controller.js index 306525b8e..d1d55aa09 100644 --- a/src/sync/peer-sync-controller.js +++ b/src/sync/peer-sync-controller.js @@ -10,7 +10,7 @@ import { Logger } from '../logger.js' */ /** @type {Namespace[]} */ -const PRESYNC_NAMESPACES = ['auth', 'config', 'blobIndex'] +export const PRESYNC_NAMESPACES = ['auth', 'config', 'blobIndex'] export class PeerSyncController { #replicatingCores = new Set() @@ -34,7 +34,7 @@ export class PeerSyncController { /** * @param {object} opts - * @param {import("protomux")} opts.protomux + * @param {import("protomux")} opts.protomux * @param {import("../core-manager/index.js").CoreManager} opts.coreManager * @param {import("./sync-state.js").SyncState} opts.syncState * @param {import("../capabilities.js").Capabilities} opts.capabilities @@ -55,8 +55,7 @@ export class PeerSyncController { this.#capabilities = capabilities // Always need to replicate the project creator core - coreManager.creatorCore.replicate(protomux) - this.#replicatingCores.add(coreManager.creatorCore) + this.#replicateCore(coreManager.creatorCore) coreManager.on('add-core', this.#handleAddCore) syncState.on('state', this.#handleStateChange) @@ -69,7 +68,11 @@ export class PeerSyncController { } get peerId() { - return this.peerKey?.toString('hex') + return this.peerKey.toString('hex') + } + + get syncCapability() { + return this.#syncCapability } /** @@ -91,6 +94,31 @@ export class PeerSyncController { this.#updateEnabledNamespaces() } + /** + * @param {Buffer} discoveryKey + */ + handleDiscoveryKey(discoveryKey) { + const coreRecord = this.#coreManager.getCoreByDiscoveryKey(discoveryKey) + // If we already know about this core, then we will add it to the + // replication stream when we are ready + if (coreRecord) { + this.#log( + 'Received discovery key %h, but already have core in namespace %s', + discoveryKey, + coreRecord.namespace + ) + if (this.#enabledNamespaces.has(coreRecord.namespace)) { + this.#replicateCore(coreRecord.core) + } + return + } + if (!this.peerKey) { + this.#log('Unexpected null peerKey') + return + } + this.#coreManager.requestCoreKey(this.peerKey, discoveryKey) + } + /** * Handler for 'core-add' event from CoreManager * Bound to `this` (defined as static property) @@ -113,13 +141,12 @@ export class PeerSyncController { // connected. We shouldn't get a state change before the noise stream has // connected, but if we do we can ignore it because we won't have any useful // information until it connects. - if (!this.#protomux.stream.remotePublicKey) return - const peerId = this.#protomux.stream.remotePublicKey.toString('hex') - this.#syncStatus = getSyncStatus(peerId, state) + if (!this.peerId) return + this.#syncStatus = getSyncStatus(this.peerId, state) const localState = mapObject(state, (ns, nsState) => { return [ns, nsState.localState] }) - this.#log('state %O', state) + this.#log('state %X', state) // Map of which namespaces have received new data since last sync change const didUpdate = mapObject(state, (ns) => { @@ -139,12 +166,12 @@ export class PeerSyncController { if (didUpdate.auth) { try { - const cap = await this.#capabilities.getCapabilities(peerId) + const cap = await this.#capabilities.getCapabilities(this.peerId) this.#syncCapability = cap.sync } catch (e) { this.#log('Error reading capability', e) - // Any error, consider sync blocked - this.#syncCapability = createNamespaceMap('blocked') + // Any error, consider sync unknown + this.#syncCapability = createNamespaceMap('unknown') } } this.#log('capability %o', this.#syncCapability) @@ -197,7 +224,12 @@ export class PeerSyncController { */ #replicateCore(core) { if (this.#replicatingCores.has(core)) return + this.#log('replicating core %k', core.key) core.replicate(this.#protomux) + core.on('peer-remove', (peer) => { + if (!peer.remotePublicKey.equals(this.peerKey)) return + this.#log('peer-remove %h from core %k', peer.remotePublicKey, core.key) + }) this.#replicatingCores.add(core) } @@ -215,32 +247,13 @@ export class PeerSyncController { this.#replicatingCores.delete(core) } - /** - * @param {import('hypercore')<'binary', any>} core - */ - #downloadCore(core) { - if (this.#downloadingRanges.has(core)) return - const range = core.download({ start: 0, end: -1 }) - this.#downloadingRanges.set(core, range) - } - - /** - * @param {import('hypercore')<'binary', any>} core - */ - #undownloadCore(core) { - const range = this.#downloadingRanges.get(core) - if (!range) return - range.destroy() - this.#downloadingRanges.delete(core) - } - /** * @param {Namespace} namespace */ #enableNamespace(namespace) { + if (this.#enabledNamespaces.has(namespace)) return for (const { core } of this.#coreManager.getCores(namespace)) { this.#replicateCore(core) - this.#downloadCore(core) } this.#enabledNamespaces.add(namespace) this.#log('enabled namespace %s', namespace) @@ -250,9 +263,9 @@ export class PeerSyncController { * @param {Namespace} namespace */ #disableNamespace(namespace) { + if (!this.#enabledNamespaces.has(namespace)) return for (const { core } of this.#coreManager.getCores(namespace)) { this.#unreplicateCore(core) - this.#undownloadCore(core) } this.#enabledNamespaces.delete(namespace) this.#log('disabled namespace %s', namespace) diff --git a/src/sync/sync-api.js b/src/sync/sync-api.js index 5eaf685db..018de6e9b 100644 --- a/src/sync/sync-api.js +++ b/src/sync/sync-api.js @@ -1,9 +1,14 @@ import { TypedEmitter } from 'tiny-typed-emitter' import { SyncState } from './sync-state.js' -import { PeerSyncController } from './peer-sync-controller.js' +import { + PeerSyncController, + PRESYNC_NAMESPACES, +} from './peer-sync-controller.js' import { Logger } from '../logger.js' +import { NAMESPACES } from '../core-manager/index.js' +import { keyToId } from '../utils.js' -export const kSyncReplicate = Symbol('replicate sync') +export const kHandleDiscoveryKey = Symbol('handle discovery key') /** * @typedef {object} SyncEvents @@ -19,8 +24,12 @@ export class SyncApi extends TypedEmitter { #capabilities /** @type {Map} */ #peerSyncControllers = new Map() + /** @type {Set} */ + #peerIds = new Set() /** @type {Set<'local' | 'remote'>} */ #dataSyncEnabled = new Set() + /** @type {Map>} */ + #pendingDiscoveryKeys = new Map() #l /** @@ -37,7 +46,39 @@ export class SyncApi extends TypedEmitter { this.#coreManager = coreManager this.#capabilities = capabilities this.syncState = new SyncState({ coreManager, throttleMs }) + this.syncState.setMaxListeners(0) this.syncState.on('state', this.emit.bind(this, 'sync-state')) + + this.#coreManager.creatorCore.on('peer-add', this.#handlePeerAdd) + this.#coreManager.creatorCore.on('peer-remove', this.#handlePeerRemove) + } + + /** @type {import('../local-peers.js').LocalPeersEvents['discovery-key']} */ + [kHandleDiscoveryKey](discoveryKey, protomux) { + const peerSyncController = this.#peerSyncControllers.get(protomux) + if (peerSyncController) { + peerSyncController.handleDiscoveryKey(discoveryKey) + return + } + // We will reach here if we are not part of the project, so we can ignore + // these keys. However it's also possible to reach here when we are part of + // a project, but the creator core `peer-add` event has not yet fired, so we + // queue this to be handled in `#handlePeerAdd` + const peerQueue = this.#pendingDiscoveryKeys.get(protomux) || new Set() + peerQueue.add(discoveryKey) + this.#pendingDiscoveryKeys.set(protomux, peerQueue) + + // If we _are_ part of the project, the `peer-add` should happen very soon + // after we get a discovery-key event, so we cleanup our queue to avoid + // memory leaks for any discovery keys that have not been handled. + setTimeout(() => { + const peerQueue = this.#pendingDiscoveryKeys.get(protomux) + if (!peerQueue) return + peerQueue.delete(discoveryKey) + if (peerQueue.size === 0) { + this.#pendingDiscoveryKeys.delete(protomux) + } + }, 500) } getState() { @@ -69,9 +110,33 @@ export class SyncApi extends TypedEmitter { } /** - * @param {import('protomux')} protomux A protomux instance + * @param {'initial' | 'full'} type + */ + async waitForSync(type) { + const state = this.getState() + const namespaces = type === 'initial' ? PRESYNC_NAMESPACES : NAMESPACES + if (isSynced(state, namespaces, this.#peerSyncControllers)) return + return new Promise((res) => { + this.on('sync-state', function onState(state) { + if (!isSynced(state, namespaces, this.#peerSyncControllers)) return + this.off('sync-state', onState) + res(null) + }) + }) + } + + /** + * Bound to `this` + * + * This will be called whenever a peer is successfully added to the creator + * core, which means that the peer has the project key. The PeerSyncController + * will then handle validation of role records to ensure that the peer is + * actually still part of the project. + * + * @param {{ protomux: import('protomux') }} peer */ - [kSyncReplicate](protomux) { + #handlePeerAdd = (peer) => { + const { protomux } = peer if (this.#peerSyncControllers.has(protomux)) { this.#l.log( 'Unexpected existing peer sync controller for peer %h', @@ -79,7 +144,6 @@ export class SyncApi extends TypedEmitter { ) return } - const peerSyncController = new PeerSyncController({ protomux, coreManager: this.#coreManager, @@ -87,9 +151,61 @@ export class SyncApi extends TypedEmitter { capabilities: this.#capabilities, logger: this.#l, }) + this.#peerSyncControllers.set(protomux, peerSyncController) + if (peerSyncController.peerId) this.#peerIds.add(peerSyncController.peerId) + if (this.#dataSyncEnabled.has('local')) { peerSyncController.enableDataSync() } - this.#peerSyncControllers.set(protomux, peerSyncController) + + const peerQueue = this.#pendingDiscoveryKeys.get(protomux) + if (peerQueue) { + for (const discoveryKey of peerQueue) { + peerSyncController.handleDiscoveryKey(discoveryKey) + } + this.#pendingDiscoveryKeys.delete(protomux) + } + } + + /** + * Bound to `this` + * + * Called when a peer is removed from the creator core, e.g. when the + * connection is terminated. + * + * @param {{ protomux: import('protomux'), remotePublicKey: Buffer }} peer + */ + #handlePeerRemove = (peer) => { + const { protomux } = peer + if (!this.#peerSyncControllers.has(protomux)) { + this.#l.log( + 'Unexpected no existing peer sync controller for peer %h', + protomux.stream.remotePublicKey + ) + return + } + this.#peerSyncControllers.delete(protomux) + this.#peerIds.delete(keyToId(peer.remotePublicKey)) + this.#pendingDiscoveryKeys.delete(protomux) + } +} + +/** + * Is the sync state "synced", e.g. is there nothing left to sync + * + * @param {import('./sync-state.js').State} state + * @param {readonly import('../core-manager/index.js').Namespace[]} namespaces + * @param {Map} peerSyncControllers + */ +function isSynced(state, namespaces, peerSyncControllers) { + for (const ns of namespaces) { + if (state[ns].dataToSync) return false + for (const psc of peerSyncControllers.values()) { + const { peerId } = psc + if (psc.syncCapability[ns] === 'blocked') continue + if (!(peerId in state[ns].remoteStates)) return false + if (state[ns].remoteStates[peerId].status === 'connecting') return false + } } + return true } diff --git a/src/utils.js b/src/utils.js index 07fc064ea..1da1b0d5d 100644 --- a/src/utils.js +++ b/src/utils.js @@ -85,7 +85,7 @@ export function deNullify(obj) { } /** - * @template {import('@mapeo/schema').MapeoDoc & { forks: string[] }} T + * @template {import('@mapeo/schema').MapeoDoc & { forks?: string[] }} T * @param {T} doc * @returns {Omit} */ diff --git a/test-e2e/capabilities.js b/test-e2e/capabilities.js index 2acdf3f7a..984ff049c 100644 --- a/test-e2e/capabilities.js +++ b/test-e2e/capabilities.js @@ -58,10 +58,13 @@ test('New device without capabilities', async (t) => { coreStorage: () => new RAM(), }) - const projectId = await manager.addProject({ - projectKey: randomBytes(32), - encryptionKeys: { auth: randomBytes(32) }, - }) + const projectId = await manager.addProject( + { + projectKey: randomBytes(32), + encryptionKeys: { auth: randomBytes(32) }, + }, + { waitForSync: false } + ) const project = await manager.getProject(projectId) await project.ready() @@ -136,10 +139,13 @@ test('getMany() - on newly invited device before sync', async (t) => { coreStorage: () => new RAM(), }) - const projectId = await manager.addProject({ - projectKey: randomBytes(32), - encryptionKeys: { auth: randomBytes(32) }, - }) + const projectId = await manager.addProject( + { + projectKey: randomBytes(32), + encryptionKeys: { auth: randomBytes(32) }, + }, + { waitForSync: false } + ) const project = await manager.getProject(projectId) await project.ready() diff --git a/test-e2e/device-info.js b/test-e2e/device-info.js index eb3860562..0ac3db0c4 100644 --- a/test-e2e/device-info.js +++ b/test-e2e/device-info.js @@ -64,10 +64,13 @@ test('device info written to projects', (t) => { await manager.setDeviceInfo({ name: 'mapeo' }) - const projectId = await manager.addProject({ - projectKey: randomBytes(32), - encryptionKeys: { auth: randomBytes(32) }, - }) + const projectId = await manager.addProject( + { + projectKey: randomBytes(32), + encryptionKeys: { auth: randomBytes(32) }, + }, + { waitForSync: false } + ) const project = await manager.getProject(projectId) diff --git a/test-e2e/manager-basic.js b/test-e2e/manager-basic.js index 274fd5ed3..9fee8537a 100644 --- a/test-e2e/manager-basic.js +++ b/test-e2e/manager-basic.js @@ -122,17 +122,23 @@ test('Managing added projects', async (t) => { coreStorage: () => new RAM(), }) - const project1Id = await manager.addProject({ - projectKey: KeyManager.generateProjectKeypair().publicKey, - encryptionKeys: { auth: randomBytes(32) }, - projectInfo: { name: 'project 1' }, - }) + const project1Id = await manager.addProject( + { + projectKey: KeyManager.generateProjectKeypair().publicKey, + encryptionKeys: { auth: randomBytes(32) }, + projectInfo: { name: 'project 1' }, + }, + { waitForSync: false } + ) - const project2Id = await manager.addProject({ - projectKey: KeyManager.generateProjectKeypair().publicKey, - encryptionKeys: { auth: randomBytes(32) }, - projectInfo: { name: 'project 2' }, - }) + const project2Id = await manager.addProject( + { + projectKey: KeyManager.generateProjectKeypair().publicKey, + encryptionKeys: { auth: randomBytes(32) }, + projectInfo: { name: 'project 2' }, + }, + { waitForSync: false } + ) t.test('initial information from listed projects', async (st) => { const listedProjects = await manager.listProjects() @@ -194,11 +200,14 @@ test('Managing both created and added projects', async (t) => { name: 'created project', }) - const addedProjectId = await manager.addProject({ - projectKey: KeyManager.generateProjectKeypair().publicKey, - encryptionKeys: { auth: randomBytes(32) }, - projectInfo: { name: 'added project' }, - }) + const addedProjectId = await manager.addProject( + { + projectKey: KeyManager.generateProjectKeypair().publicKey, + encryptionKeys: { auth: randomBytes(32) }, + projectInfo: { name: 'added project' }, + }, + { waitForSync: false } + ) const listedProjects = await manager.listProjects() @@ -263,11 +272,14 @@ test('Consistent storage folders', async (t) => { }) for (let i = 0; i < 10; i++) { - const projectId = await manager.addProject({ - projectKey: randomBytesSeed('test' + i), - encryptionKeys: { auth: randomBytes(32) }, - projectInfo: {}, - }) + const projectId = await manager.addProject( + { + projectKey: randomBytesSeed('test' + i), + encryptionKeys: { auth: randomBytes(32) }, + projectInfo: {}, + }, + { waitForSync: false } + ) await manager.getProject(projectId) } diff --git a/test-e2e/manager-invite.js b/test-e2e/manager-invite.js index 1b92df406..83750410b 100644 --- a/test-e2e/manager-invite.js +++ b/test-e2e/manager-invite.js @@ -1,179 +1,181 @@ -import { test } from 'brittle' -import { KeyManager } from '@mapeo/crypto' -import pDefer from 'p-defer' -import RAM from 'random-access-memory' -import { MEMBER_ROLE_ID } from '../src/capabilities.js' +import { test, skip } from 'brittle' +import { COORDINATOR_ROLE_ID, MEMBER_ROLE_ID } from '../src/capabilities.js' import { InviteResponse_Decision } from '../src/generated/rpc.js' -import { MapeoManager, kRPC } from '../src/mapeo-manager.js' -import { replicate } from '../tests/helpers/local-peers.js' - -const projectMigrationsFolder = new URL('../drizzle/project', import.meta.url) - .pathname -const clientMigrationsFolder = new URL('../drizzle/client', import.meta.url) - .pathname +import { once } from 'node:events' +import { + connectPeers, + createManagers, + disconnectPeers, + waitForPeers, +} from './utils.js' test('member invite accepted', async (t) => { - t.plan(10) - - const deferred = pDefer() - - const creator = new MapeoManager({ - rootKey: KeyManager.generateRootKey(), - projectMigrationsFolder, - clientMigrationsFolder, - dbFolder: ':memory:', - coreStorage: () => new RAM(), - }) - - await creator.setDeviceInfo({ name: 'Creator' }) + const [creator, joiner] = await createManagers(2) + connectPeers([creator, joiner]) + await waitForPeers([creator, joiner]) const createdProjectId = await creator.createProject({ name: 'Mapeo' }) const creatorProject = await creator.getProject(createdProjectId) - creator[kRPC].on('peers', async (peers) => { - t.is(peers.length, 1) - - const response = await creatorProject.$member.invite(peers[0].deviceId, { - roleId: MEMBER_ROLE_ID, - }) - - t.is(response, InviteResponse_Decision.ACCEPT) - deferred.resolve() - }) - - /** @type {string | undefined} */ - let expectedInvitorPeerId - - const joiner = new MapeoManager({ - rootKey: KeyManager.generateRootKey(), - projectMigrationsFolder, - clientMigrationsFolder, - dbFolder: ':memory:', - coreStorage: () => new RAM(), - }) - - await joiner.setDeviceInfo({ name: 'Joiner' }) - - t.exception( + await t.exception( async () => joiner.getProject(createdProjectId), 'joiner cannot get project instance before being invited and added to project' ) - joiner[kRPC].on('peers', (peers) => { - t.is(peers.length, 1) - expectedInvitorPeerId = peers[0].deviceId + const responsePromise = creatorProject.$member.invite(joiner.deviceId, { + roleId: MEMBER_ROLE_ID, }) + const [invite] = await once(joiner.invite, 'invite-received') + t.is(invite.projectId, createdProjectId, 'projectId of invite matches') + t.is(invite.peerId, creator.deviceId, 'deviceId of invite matches') + t.is(invite.projectName, 'Mapeo', 'project name of invite matches') - joiner.invite.on('invite-received', async (invite) => { - t.is(invite.projectId, createdProjectId) - t.is(invite.peerId, expectedInvitorPeerId) - t.is(invite.projectName, 'Mapeo') - // TODO: Check role being invited for (needs https://github.com/digidem/mapeo-core-next/issues/275) + await joiner.invite.accept(invite.projectId) - await joiner.invite.accept(invite.projectId) - }) - - replicate(creator[kRPC], joiner[kRPC]) - - await deferred.promise + t.is( + await responsePromise, + InviteResponse_Decision.ACCEPT, + 'correct invite response' + ) /// After invite flow has completed... - const joinerListedProjects = await joiner.listProjects() - - t.is(joinerListedProjects.length, 1, 'project added to joiner') t.alike( - joinerListedProjects[0], - { - name: 'Mapeo', - projectId: createdProjectId, - createdAt: undefined, - updatedAt: undefined, - }, + await joiner.listProjects(), + await creator.listProjects(), 'project info recorded in joiner successfully' ) - const joinerProject = await joiner.getProject( - joinerListedProjects[0].projectId - ) + const joinerProject = await joiner.getProject(createdProjectId) - t.ok(joinerProject, 'can create joiner project instance') + t.alike( + await joinerProject.$getProjectSettings(), + await creatorProject.$getProjectSettings(), + 'Project settings match' + ) - // TODO: Get project settings of joiner and ensure they're similar to creator's project's settings - // const joinerProjectSettings = await joinerProject.$getProjectSettings() - // t.alike(joinerProjectSettings, { defaultPresets: undefined, name: 'Mapeo' }) + t.alike( + await creatorProject.$member.getMany(), + await joinerProject.$member.getMany(), + 'Project members match' + ) - // TODO: Get members of creator project and assert info matches joiner - // const creatorProjectMembers = await creatorProject.$member.getMany() - // t.is(creatorProjectMembers.length, 1) - // t.alike(creatorProjectMembers[0], await joiner.getDeviceInfo()) + await disconnectPeers([creator, joiner]) }) -test('member invite rejected', async (t) => { - t.plan(9) +test('chain of invites', async (t) => { + const managers = await createManagers(4) + const [creator, ...joiners] = managers + connectPeers(managers) + await waitForPeers(managers) + + const createdProjectId = await creator.createProject({ name: 'Mapeo' }) - const deferred = pDefer() + let invitor = creator + for (const joiner of joiners) { + const invitorProject = await invitor.getProject(createdProjectId) + const responsePromise = invitorProject.$member.invite(joiner.deviceId, { + roleId: COORDINATOR_ROLE_ID, + }) + const [invite] = await once(joiner.invite, 'invite-received') + await joiner.invite.accept(invite.projectId) + t.is( + await responsePromise, + InviteResponse_Decision.ACCEPT, + 'correct invite response' + ) + } - const creator = new MapeoManager({ - rootKey: KeyManager.generateRootKey(), - projectMigrationsFolder, - clientMigrationsFolder, - dbFolder: ':memory:', - coreStorage: () => new RAM(), - }) + /// After invite flow has completed... + + const creatorProject = await creator.getProject(createdProjectId) + const expectedProjectSettings = await creatorProject.$getProjectSettings() + const expectedMembers = await creatorProject.$member.getMany() + + for (const joiner of joiners) { + const joinerProject = await joiner.getProject(createdProjectId) + + t.alike( + await joinerProject.$getProjectSettings(), + expectedProjectSettings, + 'Project settings match' + ) + + const joinerMembers = await joinerProject.$member.getMany() + t.alike( + joinerMembers.sort(memberSort), + expectedMembers.sort(memberSort), + 'Project members match' + ) + } + + await disconnectPeers(managers) +}) - await creator.setDeviceInfo({ name: 'Creator' }) +// TODO: Needs fix to inviteApi to check capabilities before sending invite +skip("member can't invite", async (t) => { + const managers = await createManagers(3) + const [creator, member, joiner] = managers + connectPeers(managers) + await waitForPeers(managers) const createdProjectId = await creator.createProject({ name: 'Mapeo' }) const creatorProject = await creator.getProject(createdProjectId) - creator[kRPC].on('peers', async (peers) => { - t.is(peers.length, 1) + const responsePromise = creatorProject.$member.invite(member.deviceId, { + roleId: MEMBER_ROLE_ID, + }) + const [invite] = await once(member.invite, 'invite-received') + await member.invite.accept(invite.projectId) + await responsePromise - const response = await creatorProject.$member.invite(peers[0].deviceId, { - roleId: MEMBER_ROLE_ID, - }) + /// After invite flow has completed... - t.is(response, InviteResponse_Decision.REJECT) + const memberProject = await member.getProject(createdProjectId) - deferred.resolve() - }) + t.alike( + await memberProject.$getProjectSettings(), + await creatorProject.$getProjectSettings(), + 'Project settings match' + ) - /** @type {string | undefined} */ - let expectedInvitorPeerId + const exceptionPromise = t.exception(() => + memberProject.$member.invite(joiner.deviceId, { roleId: MEMBER_ROLE_ID }) + ) + joiner.invite.once('invite-received', () => t.fail('should not send invite')) + await exceptionPromise - const joiner = new MapeoManager({ - rootKey: KeyManager.generateRootKey(), - projectMigrationsFolder, - clientMigrationsFolder, - dbFolder: ':memory:', - coreStorage: () => new RAM(), - }) + await disconnectPeers(managers) +}) - await joiner.setDeviceInfo({ name: 'Joiner' }) +test('member invite rejected', async (t) => { + const [creator, joiner] = await createManagers(2) + connectPeers([creator, joiner]) + await waitForPeers([creator, joiner]) - t.exception( + const createdProjectId = await creator.createProject({ name: 'Mapeo' }) + const creatorProject = await creator.getProject(createdProjectId) + + await t.exception( async () => joiner.getProject(createdProjectId), 'joiner cannot get project instance before being invited and added to project' ) - joiner[kRPC].on('peers', (peers) => { - t.is(peers.length, 1) - expectedInvitorPeerId = peers[0].deviceId + const responsePromise = creatorProject.$member.invite(joiner.deviceId, { + roleId: MEMBER_ROLE_ID, }) + const [invite] = await once(joiner.invite, 'invite-received') + t.is(invite.projectId, createdProjectId, 'projectId of invite matches') + t.is(invite.peerId, creator.deviceId, 'deviceId of invite matches') + t.is(invite.projectName, 'Mapeo', 'project name of invite matches') - joiner.invite.on('invite-received', async (invite) => { - t.is(invite.projectId, createdProjectId) - t.is(invite.peerId, expectedInvitorPeerId) - t.is(invite.projectName, 'Mapeo') - // TODO: Check role being invited for (needs https://github.com/digidem/mapeo-core-next/issues/275) + await joiner.invite.reject(invite.projectId) - await joiner.invite.reject(invite.projectId) - }) - - replicate(creator[kRPC], joiner[kRPC]) - - await deferred.promise + t.is( + await responsePromise, + InviteResponse_Decision.REJECT, + 'correct invite response' + ) /// After invite flow has completed... @@ -186,7 +188,21 @@ test('member invite rejected', async (t) => { 'joiner cannot get project instance' ) - // TODO: Get members of creator project and assert joiner not added - // const creatorProjectMembers = await creatorProject.$member.getMany() - // t.is(creatorProjectMembers.length, 0) + t.is( + (await creatorProject.$member.getMany()).length, + 1, + 'Only 1 member in project still' + ) + + await disconnectPeers([creator, joiner]) }) + +/** + * @param {import('../src/member-api.js').MemberInfo} a + * @param {import('../src/member-api.js').MemberInfo} b + */ +function memberSort(a, b) { + if (a.deviceId < b.deviceId) return -1 + if (a.deviceId > b.deviceId) return 1 + return 0 +} diff --git a/test-e2e/members.js b/test-e2e/members.js index 2d4b6b653..2934a531a 100644 --- a/test-e2e/members.js +++ b/test-e2e/members.js @@ -1,27 +1,25 @@ +// @ts-check import { test } from 'brittle' -import RAM from 'random-access-memory' -import { KeyManager } from '@mapeo/crypto' -import pDefer from 'p-defer' import { randomBytes } from 'crypto' -import { MapeoManager, kRPC } from '../src/mapeo-manager.js' import { CREATOR_CAPABILITIES, DEFAULT_CAPABILITIES, MEMBER_ROLE_ID, NO_ROLE_CAPABILITIES, } from '../src/capabilities.js' -import { replicate } from '../tests/helpers/local-peers.js' - -const projectMigrationsFolder = new URL('../drizzle/project', import.meta.url) - .pathname -const clientMigrationsFolder = new URL('../drizzle/client', import.meta.url) - .pathname +import { + connectPeers, + createManagers, + disconnectPeers, + invite, + waitForPeers, +} from './utils.js' test('getting yourself after creating project', async (t) => { - const { manager } = setup() + const [manager] = await createManagers(1) - await manager.setDeviceInfo({ name: 'mapeo' }) + const deviceInfo = await manager.getDeviceInfo() const project = await manager.getProject(await manager.createProject()) await project.ready() @@ -31,7 +29,7 @@ test('getting yourself after creating project', async (t) => { me, { deviceId: project.deviceId, - name: 'mapeo', + name: deviceInfo.name, capabilities: CREATOR_CAPABILITIES, }, 'has expected member info with creator capabilities' @@ -44,22 +42,25 @@ test('getting yourself after creating project', async (t) => { members[0], { deviceId: project.deviceId, - name: 'mapeo', + name: deviceInfo.name, capabilities: CREATOR_CAPABILITIES, }, 'has expected member info with creator capabilities' ) }) -test('getting yourself after being invited to project (but not yet synced)', async (t) => { - const { manager } = setup() +test('getting yourself after adding project (but not yet synced)', async (t) => { + const [manager] = await createManagers(1) - await manager.setDeviceInfo({ name: 'mapeo' }) + const deviceInfo = await manager.getDeviceInfo() const project = await manager.getProject( - await manager.addProject({ - projectKey: randomBytes(32), - encryptionKeys: { auth: randomBytes(32) }, - }) + await manager.addProject( + { + projectKey: randomBytes(32), + encryptionKeys: { auth: randomBytes(32) }, + }, + { waitForSync: false } + ) ) await project.ready() @@ -69,7 +70,7 @@ test('getting yourself after being invited to project (but not yet synced)', asy me, { deviceId: project.deviceId, - name: 'mapeo', + name: deviceInfo.name, capabilities: NO_ROLE_CAPABILITIES, }, 'has expected member info with no role capabilities' @@ -82,7 +83,7 @@ test('getting yourself after being invited to project (but not yet synced)', asy members[0], { deviceId: project.deviceId, - name: 'mapeo', + name: deviceInfo.name, capabilities: NO_ROLE_CAPABILITIES, }, 'has expected member info with no role capabilities' @@ -90,19 +91,24 @@ test('getting yourself after being invited to project (but not yet synced)', asy }) test('getting invited member after invite rejected', async (t) => { - const { manager, simulateMemberInvite } = setup() + const managers = await createManagers(2) + const [invitor, invitee] = managers + connectPeers(managers) + await waitForPeers(managers) - await manager.setDeviceInfo({ name: 'mapeo' }) - const project = await manager.getProject(await manager.createProject()) + const projectId = await invitor.createProject() + const project = await invitor.getProject(projectId) await project.ready() - const invitedDeviceId = await simulateMemberInvite(project, 'reject', { - deviceInfo: { name: 'member' }, - roleId: MEMBER_ROLE_ID, + await invite({ + invitor, + projectId, + invitees: [invitee], + reject: true, }) await t.exception( - () => project.$member.getById(invitedDeviceId), + () => project.$member.getById(invitee.deviceId), 'invited member cannot be retrieved' ) @@ -110,107 +116,46 @@ test('getting invited member after invite rejected', async (t) => { t.is(members.length, 1) t.absent( - members.find((m) => m.deviceId === invitedDeviceId), + members.find((m) => m.deviceId === invitee.deviceId), 'invited member not found' ) + await disconnectPeers(managers) }) test('getting invited member after invite accepted', async (t) => { - const { manager, simulateMemberInvite } = setup() - - await manager.setDeviceInfo({ name: 'mapeo' }) - const project = await manager.getProject(await manager.createProject()) + const managers = await createManagers(2) + const [invitor, invitee] = managers + connectPeers(managers) + await waitForPeers(managers) + + const { name: inviteeName } = await invitee.getDeviceInfo() + const projectId = await invitor.createProject() + const project = await invitor.getProject(projectId) await project.ready() - const invitedDeviceId = await simulateMemberInvite(project, 'accept', { - deviceInfo: { name: 'member' }, + await invite({ + invitor, + projectId, + invitees: [invitee], roleId: MEMBER_ROLE_ID, }) - // Before syncing - { - const invitedMember = await project.$member.getById(invitedDeviceId) - - t.alike( - invitedMember, - { - deviceId: invitedDeviceId, - capabilities: DEFAULT_CAPABILITIES[MEMBER_ROLE_ID], - }, - 'has expected member info with member capabilities' - ) - } - - { - const members = await project.$member.getMany() + const members = await project.$member.getMany() - t.is(members.length, 2) + t.is(members.length, 2) - const invitedMember = members.find((m) => m.deviceId === invitedDeviceId) + const invitedMember = members.find((m) => m.deviceId === invitee.deviceId) - t.alike( - invitedMember, - { - deviceId: invitedDeviceId, - capabilities: DEFAULT_CAPABILITIES[MEMBER_ROLE_ID], - }, - 'has expected member info with member capabilities' - ) - } + t.alike( + invitedMember, + { + deviceId: invitee.deviceId, + name: inviteeName, + capabilities: DEFAULT_CAPABILITIES[MEMBER_ROLE_ID], + }, + 'has expected member info with member capabilities' + ) // TODO: Test that device info of invited member can be read from invitor after syncing + await disconnectPeers(managers) }) - -function setup() { - const manager = new MapeoManager({ - rootKey: KeyManager.generateRootKey(), - projectMigrationsFolder, - clientMigrationsFolder, - dbFolder: ':memory:', - coreStorage: () => new RAM(), - }) - - /** - * - * @param {import('../src/mapeo-project.js').MapeoProject} project - * @param {'accept' | 'reject'} respondWith - * @param {{ deviceInfo: import('../src/generated/rpc.js').DeviceInfo, roleId: import('../src/capabilities.js').RoleId }} mocked - * - */ - async function simulateMemberInvite( - project, - respondWith, - { deviceInfo, roleId } - ) { - /** @type {import('p-defer').DeferredPromise} */ - const deferred = pDefer() - - const otherManager = new MapeoManager({ - rootKey: KeyManager.generateRootKey(), - projectMigrationsFolder, - clientMigrationsFolder, - dbFolder: ':memory:', - coreStorage: () => new RAM(), - }) - - await otherManager.setDeviceInfo(deviceInfo) - - otherManager.invite.on('invite-received', ({ projectId }) => { - otherManager.invite[respondWith](projectId).catch(deferred.reject) - }) - - manager[kRPC].on('peers', (peers) => { - const deviceId = peers[0].deviceId - project.$member - .invite(deviceId, { roleId }) - .then(() => deferred.resolve(deviceId)) - .catch(deferred.reject) - }) - - replicate(manager[kRPC], otherManager[kRPC]) - - return deferred.promise - } - - return { manager, simulateMemberInvite } -} diff --git a/test-e2e/project-crud.js b/test-e2e/project-crud.js index 53ae1b2bf..6c25708ae 100644 --- a/test-e2e/project-crud.js +++ b/test-e2e/project-crud.js @@ -4,6 +4,8 @@ import { KeyManager } from '@mapeo/crypto' import { valueOf } from '../src/utils.js' import { MapeoManager } from '../src/mapeo-manager.js' import RAM from 'random-access-memory' +import { stripUndef } from './utils.js' +import { round } from './utils.js' /** @satisfies {Array} */ const fixtures = [ @@ -130,20 +132,3 @@ test('CRUD operations', async (t) => { }) } }) - -/** - * Remove undefined properties from an object, to allow deep comparison - * @param {object} obj - */ -function stripUndef(obj) { - return JSON.parse(JSON.stringify(obj)) -} - -/** - * - * @param {number} value - * @param {number} decimalPlaces - */ -function round(value, decimalPlaces) { - return Math.round(value * 10 ** decimalPlaces) / 10 ** decimalPlaces -} diff --git a/test-e2e/sync.js b/test-e2e/sync.js new file mode 100644 index 000000000..20abb7e58 --- /dev/null +++ b/test-e2e/sync.js @@ -0,0 +1,256 @@ +import { test } from 'brittle' +import { + connectPeers, + createManagers, + invite, + seedDatabases, + sortById, + waitForSync, +} from './utils.js' +import { kCoreManager } from '../src/mapeo-project.js' +import { getKeys } from '../tests/helpers/core-manager.js' +import { NAMESPACES } from '../src/core-manager/index.js' +import { PRESYNC_NAMESPACES } from '../src/sync/peer-sync-controller.js' +import { generate } from '@mapeo/mock-data' +import { valueOf } from '../src/utils.js' +import pTimeout from 'p-timeout' +import { BLOCKED_ROLE_ID, COORDINATOR_ROLE_ID } from '../src/capabilities.js' + +const SCHEMAS_INITIAL_SYNC = ['preset', 'field'] + +test('Create and sync data', async function (t) { + const COUNT = 5 + const managers = await createManagers(COUNT) + const [invitor, ...invitees] = managers + const disconnect = connectPeers(managers, { discovery: false }) + const projectId = await invitor.createProject() + await invite({ invitor, invitees, projectId }) + await disconnect() + + const projects = await Promise.all( + managers.map((m) => m.getProject(projectId)) + ) + + const generatedDocs = (await seedDatabases(projects)).flat() + t.pass(`Generated ${generatedDocs.length} values`) + const generatedSchemaNames = generatedDocs.reduce((acc, cur) => { + acc.add(cur.schemaName) + return acc + }, new Set()) + + connectPeers(managers, { discovery: false }) + await waitForSync(projects, 'initial') + + for (const schemaName of generatedSchemaNames) { + for (const project of projects) { + const deviceId = project.deviceId.slice(0, 7) + // @ts-ignore - to complex to narrow `schemaName` to valid values + const docs = await project[schemaName].getMany() + const expected = generatedDocs.filter((v) => v.schemaName === schemaName) + if (SCHEMAS_INITIAL_SYNC.includes(schemaName)) { + t.alike( + sortById(docs), + sortById(expected), + `All ${schemaName} docs synced to ${deviceId}` + ) + } else { + t.not( + docs.length, + expected.length, + `Not all ${schemaName} docs synced to ${deviceId}` + ) + } + } + } + + for (const project of projects) { + project.$sync.start() + } + + await waitForSync(projects, 'full') + + for (const schemaName of generatedSchemaNames) { + for (const project of projects) { + const deviceId = project.deviceId.slice(0, 7) + // @ts-ignore - to complex to narrow `schemaName` to valid values + const docs = await project[schemaName].getMany() + const expected = generatedDocs.filter((v) => v.schemaName === schemaName) + t.alike( + sortById(docs), + sortById(expected), + `All ${schemaName} docs synced to ${deviceId}` + ) + } + } +}) + +test('start and stop sync', async function (t) { + // Checks that both peers need to start syncing for data to sync, and that + // $sync.stop() actually stops data syncing + const COUNT = 2 + const managers = await createManagers(COUNT) + const [invitor, ...invitees] = managers + const disconnect = connectPeers(managers, { discovery: false }) + const projectId = await invitor.createProject() + await invite({ invitor, invitees, projectId }) + + const projects = await Promise.all( + managers.map((m) => m.getProject(projectId)) + ) + const [invitorProject, inviteeProject] = projects + + const obs1 = await invitorProject.observation.create( + valueOf(generate('observation')[0]) + ) + await waitForSync(projects, 'initial') + inviteeProject.$sync.start() + + await t.exception( + () => pTimeout(waitForSync(projects, 'full'), { milliseconds: 1000 }), + 'wait for sync times out' + ) + + await t.exception( + () => inviteeProject.observation.getByDocId(obs1.docId), + 'before both peers have started sync, doc does not sync' + ) + + invitorProject.$sync.start() + + // Use the same timeout as above, to check that it would have synced given the timeout + await pTimeout(waitForSync(projects, 'full'), { milliseconds: 1000 }) + + const obs1Synced = await inviteeProject.observation.getByDocId(obs1.docId) + + t.alike(obs1Synced, obs1, 'observation is synced') + + inviteeProject.$sync.stop() + + const obs2 = await inviteeProject.observation.create( + valueOf(generate('observation')[0]) + ) + await waitForSync(projects, 'initial') + + await t.exception( + () => pTimeout(waitForSync(projects, 'full'), { milliseconds: 1000 }), + 'wait for sync times out' + ) + + await t.exception( + () => invitorProject.observation.getByDocId(obs2.docId), + 'after stopping sync, data does not sync' + ) + + inviteeProject.$sync.start() + + await pTimeout(waitForSync(projects, 'full'), { milliseconds: 1000 }) + + const obs2Synced = await invitorProject.observation.getByDocId(obs2.docId) + + t.alike(obs2Synced, obs2, 'observation is synced') + + await disconnect() +}) + +test('shares cores', async function (t) { + const COUNT = 5 + const managers = await createManagers(COUNT) + const [invitor, ...invitees] = managers + connectPeers(managers, { discovery: false }) + const projectId = await invitor.createProject() + await invite({ invitor, invitees, projectId }) + + const projects = await Promise.all( + managers.map((m) => m.getProject(projectId)) + ) + const coreManagers = projects.map((p) => p[kCoreManager]) + + await waitForSync(projects, 'initial') + + for (const ns of PRESYNC_NAMESPACES) { + for (const cm of coreManagers) { + const keyCount = getKeys(cm, ns).length + t.is(keyCount, COUNT, 'expected number of cores') + } + } + + // Currently need to start syncing to share other keys - this might change if + // we add keys based on coreOwnership records + for (const project of projects) { + project.$sync.start() + } + + await waitForSync(projects, 'full') + + for (const ns of NAMESPACES) { + for (const cm of coreManagers) { + const keyCount = getKeys(cm, ns).length + t.is(keyCount, COUNT, 'expected number of cores') + } + } +}) + +test('no sync capabilities === no namespaces sync apart from auth', async (t) => { + const COUNT = 3 + const managers = await createManagers(COUNT) + const [invitor, invitee, blocked] = managers + const disconnect1 = connectPeers(managers, { discovery: false }) + const projectId = await invitor.createProject() + await invite({ + invitor, + invitees: [blocked], + projectId, + roleId: BLOCKED_ROLE_ID, + }) + await invite({ + invitor, + invitees: [invitee], + projectId, + roleId: COORDINATOR_ROLE_ID, + }) + + const projects = await Promise.all( + managers.map((m) => m.getProject(projectId)) + ) + const [invitorProject, inviteeProject] = projects + + const generatedDocs = (await seedDatabases([inviteeProject])).flat() + const configDocsCount = generatedDocs.filter( + (doc) => doc.schemaName !== 'observation' + ).length + const dataDocsCount = generatedDocs.length - configDocsCount + + for (const project of projects) { + project.$sync.start() + } + + await waitForSync([inviteeProject, invitorProject], 'full') + + const [invitorState, inviteeState, blockedState] = projects.map((p) => + p.$sync.getState() + ) + + t.is(invitorState.config.localState.have, configDocsCount + COUNT) // count device info doc for each invited device + t.is(invitorState.data.localState.have, dataDocsCount) + t.is(blockedState.config.localState.have, 1) // just the device info doc + t.is(blockedState.data.localState.have, 0) // no data docs synced + + for (const ns of NAMESPACES) { + if (ns === 'auth') { + t.is(invitorState[ns].coreCount, 3) + t.is(inviteeState[ns].coreCount, 3) + t.is(blockedState[ns].coreCount, 3) + } else if (PRESYNC_NAMESPACES.includes(ns)) { + t.is(invitorState[ns].coreCount, 3) + t.is(inviteeState[ns].coreCount, 3) + t.is(blockedState[ns].coreCount, 1) + } else { + t.is(invitorState[ns].coreCount, 2) + t.is(inviteeState[ns].coreCount, 2) + t.is(blockedState[ns].coreCount, 1) + } + t.alike(invitorState[ns].localState, inviteeState[ns].localState) + } + + await disconnect1() +}) diff --git a/test-e2e/utils.js b/test-e2e/utils.js index 04734e8f7..6c9e8bc98 100644 --- a/test-e2e/utils.js +++ b/test-e2e/utils.js @@ -1,3 +1,305 @@ +// @ts-check +import sodium from 'sodium-universal' +import RAM from 'random-access-memory' + +import { MapeoManager } from '../src/index.js' +import { kManagerReplicate, kRPC } from '../src/mapeo-manager.js' +import { MEMBER_ROLE_ID } from '../src/capabilities.js' +import { once } from 'node:events' +import { generate } from '@mapeo/mock-data' +import { valueOf } from '../src/utils.js' +import { randomInt } from 'node:crypto' + +const projectMigrationsFolder = new URL('../drizzle/project', import.meta.url) + .pathname +const clientMigrationsFolder = new URL('../drizzle/client', import.meta.url) + .pathname + +/** + * @param {readonly MapeoManager[]} managers + */ +export async function disconnectPeers(managers) { + return Promise.all( + managers.map(async (manager) => { + return manager.stopLocalPeerDiscovery({ force: true }) + }) + ) +} + +/** + * @param {readonly MapeoManager[]} managers + */ +export function connectPeers(managers, { discovery = true } = {}) { + if (discovery) { + for (const manager of managers) { + manager.startLocalPeerDiscovery() + } + return function destroy() { + return disconnectPeers(managers) + } + } else { + /** @type {import('../src/types.js').ReplicationStream[]} */ + const replicationStreams = [] + for (let i = 0; i < managers.length; i++) { + for (let j = i + 1; j < managers.length; j++) { + const r1 = managers[i][kManagerReplicate](true) + const r2 = managers[j][kManagerReplicate](false) + replicationStreams.push(r1, r2) + r1.pipe(r2).pipe(r1) + } + } + return function destroy() { + const promises = [] + for (const stream of replicationStreams) { + promises.push( + /** @type {Promise} */ + ( + new Promise((res) => { + stream.on('close', res) + stream.destroy() + }) + ) + ) + } + return Promise.all(promises) + } + } +} + +/** + * Invite mapeo clients to a project + * + * @param {{ + * invitor: MapeoManager, + * projectId: string, + * invitees: MapeoManager[], + * roleId?: import('../src/capabilities.js').RoleId, + * reject?: boolean + * }} opts + */ +export async function invite({ + invitor, + projectId, + invitees, + roleId = MEMBER_ROLE_ID, + reject = false, +}) { + const invitorProject = await invitor.getProject(projectId) + const promises = [] + + for (const invitee of invitees) { + promises.push( + invitorProject.$member.invite(invitee.deviceId, { + roleId, + }) + ) + promises.push( + once(invitee.invite, 'invite-received').then(([invite]) => { + return reject + ? invitee.invite.reject(invite.projectId) + : invitee.invite.accept(invite.projectId) + }) + ) + } + + await Promise.allSettled(promises) +} + +/** + * Waits for all manager instances to be connected to each other + * + * @param {readonly MapeoManager[]} managers + */ +export async function waitForPeers(managers) { + const peerCounts = managers.map((manager) => { + return manager[kRPC].peers.filter(({ status }) => status === 'connected') + .length + }) + const expectedCount = managers.length - 1 + return new Promise((res) => { + if (peerCounts.every((v) => v === expectedCount)) { + return res(null) + } + for (const [idx, manager] of managers.entries()) { + manager.on('local-peers', function onPeers(peers) { + const connectedPeerCount = peers.filter( + ({ status }) => status === 'connected' + ).length + peerCounts[idx] = connectedPeerCount + if (connectedPeerCount === expectedCount) { + manager.off('local-peers', onPeers) + } + if (peerCounts.every((v) => v === expectedCount)) { + res(null) + } + }) + } + }) +} + +/** + * Create `count` manager instances. Each instance has a deterministic identity + * keypair so device IDs should be consistent between tests. + * + * @template {number} T + * @param {T} count + * @returns {Promise>} + */ +export async function createManagers(count) { + // @ts-ignore + return Promise.all( + Array(count) + .fill(null) + .map(async (_, i) => { + const name = 'device' + i + const manager = createManager(name) + await manager.setDeviceInfo({ name }) + return manager + }) + ) +} + +/** @param {string} [seed] */ +export function createManager(seed) { + return new MapeoManager({ + rootKey: getRootKey(seed), + projectMigrationsFolder, + clientMigrationsFolder, + dbFolder: ':memory:', + coreStorage: () => new RAM(), + }) +} + +/** @param {string} [seed] */ +function getRootKey(seed) { + const key = Buffer.allocUnsafe(16) + if (!seed) { + sodium.randombytes_buf(key) + } else { + const seedBuf = Buffer.alloc(32) + sodium.crypto_generichash(seedBuf, Buffer.from(seed)) + sodium.randombytes_buf_deterministic(key, seedBuf) + } + return key +} +/** + * Remove undefined properties from an object, to allow deep comparison + * @param {object} obj + */ +export function stripUndef(obj) { + return JSON.parse(JSON.stringify(obj)) +} +/** + * + * @param {number} value + * @param {number} decimalPlaces + */ +export function round(value, decimalPlaces) { + return Math.round(value * 10 ** decimalPlaces) / 10 ** decimalPlaces +} + +/** + * Unlike `mapeo.project.$sync.waitForSync` this also waits for the specified + * number of peers to connect. + * + * @param {import('../src/mapeo-project.js').MapeoProject} project + * @param {string[]} peerIds + * @param {'initial' | 'full'} [type] + */ +async function waitForProjectSync(project, peerIds, type = 'initial') { + const state = await project.$sync.getState() + if (hasPeerIds(state.auth.remoteStates, peerIds)) { + return project.$sync.waitForSync(type) + } + return new Promise((res) => { + project.$sync.on('sync-state', function onState(state) { + if (!hasPeerIds(state.auth.remoteStates, peerIds)) return + project.$sync.off('sync-state', onState) + res(project.$sync.waitForSync(type)) + }) + }) +} + +/** + * @param {Record} remoteStates + * @param {string[]} peerIds + * @returns + */ +function hasPeerIds(remoteStates, peerIds) { + for (const peerId of peerIds) { + if (!(peerId in remoteStates)) return false + } + return true +} + +/** + * Wait for all projects to connect and sync + * + * @param {import('../src/mapeo-project.js').MapeoProject[]} projects + * @param {'initial' | 'full'} [type] + */ +export function waitForSync(projects, type = 'initial') { + return Promise.all( + projects.map((project) => { + const peerIds = projects + .filter((p) => p !== project) + .map((p) => p.deviceId) + return waitForProjectSync(project, peerIds, type) + }) + ) +} + +/** + * @param {import('../src/mapeo-project.js').MapeoProject[]} projects + */ +export function seedDatabases(projects) { + return Promise.all(projects.map((p) => seedProjectDatabase(p))) +} + +const SCHEMAS_TO_SEED = /** @type {const} */ ([ + 'observation', + 'preset', + 'field', +]) + +/** + * @param {import('../src/mapeo-project.js').MapeoProject} project + * @returns {Promise>} + */ +async function seedProjectDatabase(project) { + const promises = [] + for (const schemaName of SCHEMAS_TO_SEED) { + const count = + schemaName === 'observation' ? randomInt(20, 100) : randomInt(0, 10) + let i = 0 + while (i++ < count) { + const value = valueOf(generate(schemaName)[0]) + promises.push( + // @ts-ignore + project[schemaName].create(value) + ) + } + } + return Promise.all(promises) +} + +/** + * @template {object} T + * @param {T[]} arr + * @param {keyof T} key + */ +export function sortBy(arr, key) { + return arr.sort(function (a, b) { + if (a[key] < b[key]) return -1 + if (a[key] > b[key]) return 1 + return 0 + }) +} + +/** @param {import('@mapeo/schema').MapeoDoc[]} docs */ +export function sortById(docs) { + return sortBy(docs, 'docId') +} /** * Lazy way of removing fields with undefined values from an object * @param {unknown} object diff --git a/tests/blob-store/blob-store.js b/tests/blob-store/blob-store.js index 7dc5558cb..c15cd6fc8 100644 --- a/tests/blob-store/blob-store.js +++ b/tests/blob-store/blob-store.js @@ -5,10 +5,14 @@ import { pipelinePromise as pipeline } from 'streamx' import { randomBytes } from 'node:crypto' import fs from 'fs' import { readFile } from 'fs/promises' -import { createCoreManager, waitForCores } from '../helpers/core-manager.js' +import { + replicate, + createCoreManager, + waitForCores, +} from '../helpers/core-manager.js' import { BlobStore } from '../../src/blob-store/index.js' import { setTimeout } from 'node:timers/promises' -import { replicateBlobs, concat } from '../helpers/blob-store.js' +import { concat } from '../helpers/blob-store.js' import { discoveryKey } from 'hypercore-crypto' // Test with buffers that are 3 times the default blockSize for hyperblobs @@ -86,11 +90,13 @@ test('get(), initialized but unreplicated drive', async (t) => { }) const driveId = await bs1.put(blob1Id, blob1) - const { destroy } = replicateBlobs(cm1, cm2) + const { destroy } = replicate(cm1, cm2) await waitForCores(cm2, [cm1.getWriterCore('blobIndex').key]) /** @type {any} */ - const replicatedCore = cm2.getCoreByDiscoveryKey(Buffer.from(driveId, 'hex')) + const { core: replicatedCore } = cm2.getCoreByDiscoveryKey( + Buffer.from(driveId, 'hex') + ) await replicatedCore.update({ wait: true }) await destroy() t.is(replicatedCore.contiguousLength, 0, 'data is not downloaded') @@ -111,10 +117,12 @@ test('get(), replicated blobIndex, but blobs not replicated', async (t) => { }) const driveId = await bs1.put(blob1Id, blob1) - const { destroy } = replicateBlobs(cm1, cm2) + const { destroy } = replicate(cm1, cm2) await waitForCores(cm2, [cm1.getWriterCore('blobIndex').key]) /** @type {any} */ - const replicatedCore = cm2.getCoreByDiscoveryKey(Buffer.from(driveId, 'hex')) + const { core: replicatedCore } = cm2.getCoreByDiscoveryKey( + Buffer.from(driveId, 'hex') + ) await replicatedCore.update({ wait: true }) await replicatedCore.download({ end: replicatedCore.length }).done() await destroy() @@ -273,13 +281,13 @@ test('live download', async function (t) { // STEP 1: Write a blob to CM1 const driveId1 = await bs1.put(blob1Id, blob1) // STEP 2: Replicate CM1 with CM3 - const { destroy: destroy1 } = replicateBlobs(cm1, cm3) + const { destroy: destroy1 } = replicate(cm1, cm3) // STEP 3: Start live download to CM3 const liveDownload = bs3.download() // STEP 4: Wait for blobs to be downloaded await downloaded(liveDownload) // STEP 5: Replicate CM2 with CM3 - const { destroy: destroy2 } = replicateBlobs(cm2, cm3) + const { destroy: destroy2 } = replicate(cm2, cm3) // STEP 6: Write a blob to CM2 const driveId2 = await bs2.put(blob2Id, blob2) // STEP 7: Wait for blobs to be downloaded @@ -328,7 +336,7 @@ test('sparse live download', async function (t) { await bs1.put(blob2Id, blob2) await bs1.put(blob3Id, blob3) - const { destroy } = replicateBlobs(cm1, cm2) + const { destroy } = replicate(cm1, cm2) const liveDownload = bs2.download({ photo: ['original', 'preview'] }) await downloaded(liveDownload) @@ -365,7 +373,7 @@ test('cancelled live download', async function (t) { // STEP 1: Write a blob to CM1 const driveId1 = await bs1.put(blob1Id, blob1) // STEP 2: Replicate CM1 with CM3 - const { destroy: destroy1 } = replicateBlobs(cm1, cm3) + const { destroy: destroy1 } = replicate(cm1, cm3) // STEP 3: Start live download to CM3 const ac = new AbortController() const liveDownload = bs3.download(undefined, { signal: ac.signal }) @@ -374,7 +382,7 @@ test('cancelled live download', async function (t) { // STEP 5: Cancel download ac.abort() // STEP 6: Replicate CM2 with CM3 - const { destroy: destroy2 } = replicateBlobs(cm2, cm3) + const { destroy: destroy2 } = replicate(cm2, cm3) // STEP 7: Write a blob to CM2 const driveId2 = await bs2.put(blob2Id, blob2) // STEP 8: Wait for blobs to (not) download diff --git a/tests/core-manager.js b/tests/core-manager.js index cb7e2b2cf..5be685fcd 100644 --- a/tests/core-manager.js +++ b/tests/core-manager.js @@ -6,7 +6,10 @@ import { createCoreManager, replicate } from './helpers/core-manager.js' import { randomBytes } from 'crypto' import Sqlite from 'better-sqlite3' import { KeyManager } from '@mapeo/crypto' -import { CoreManager, unreplicate } from '../src/core-manager/index.js' +import { + CoreManager, + kCoreManagerReplicate, +} from '../src/core-manager/index.js' import RemoteBitfield from '../src/core-manager/remote-bitfield.js' import assert from 'assert' import { once } from 'node:events' @@ -15,8 +18,8 @@ import { exec } from 'child_process' import { RandomAccessFilePool } from '../src/core-manager/random-access-file-pool.js' import RandomAccessFile from 'random-access-file' import path from 'path' -import { setTimeout as delay } from 'node:timers/promises' import { Transform } from 'streamx' +import { waitForCores } from './helpers/core-manager.js' async function createCore(key) { const core = new Hypercore(RAM, key) @@ -24,76 +27,6 @@ async function createCore(key) { return core } -test('shares auth cores', async function (t) { - const projectKey = randomBytes(32) - const cm1 = createCoreManager({ projectKey }) - const cm2 = createCoreManager({ projectKey }) - - replicate(cm1, cm2) - - await Promise.all([ - waitForCores(cm1, getKeys(cm2, 'auth')), - waitForCores(cm2, getKeys(cm1, 'auth')), - ]) - - const cm1Keys = getKeys(cm1, 'auth').sort(Buffer.compare) - const cm2Keys = getKeys(cm2, 'auth').sort(Buffer.compare) - - t.alike(cm1Keys, cm2Keys, 'Share same auth cores') -}) - -test('shares other cores', async function (t) { - const projectKey = randomBytes(32) - const cm1 = createCoreManager({ projectKey }) - const cm2 = createCoreManager({ projectKey }) - - const { - rsm: [rsm1, rsm2], - } = replicate(cm1, cm2) - - for (const namespace of ['config', 'data', 'blob', 'blobIndex']) { - rsm1.enableNamespace(namespace) - rsm2.enableNamespace(namespace) - await Promise.all([ - waitForCores(cm1, getKeys(cm2, namespace)), - waitForCores(cm2, getKeys(cm1, namespace)), - ]) - const cm1Keys = getKeys(cm1, namespace).sort(Buffer.compare) - const cm2Keys = getKeys(cm2, namespace).sort(Buffer.compare) - - t.alike(cm1Keys, cm2Keys, `Share same ${namespace} cores`) - } -}) - -// Testing this case because in real-use namespaces are not enabled at the same time -test('shares cores if namespaces enabled at different times', async function (t) { - const projectKey = randomBytes(32) - const cm1 = createCoreManager({ projectKey }) - const cm2 = createCoreManager({ projectKey }) - - const { - rsm: [rsm1, rsm2], - } = replicate(cm1, cm2) - - for (const namespace of ['config', 'data', 'blob', 'blobIndex']) { - rsm1.enableNamespace(namespace) - } - - await delay(1000) - - for (const namespace of ['config', 'data', 'blob', 'blobIndex']) { - rsm2.enableNamespace(namespace) - await Promise.all([ - waitForCores(cm1, getKeys(cm2, namespace)), - waitForCores(cm2, getKeys(cm1, namespace)), - ]) - const cm1Keys = getKeys(cm1, namespace).sort(Buffer.compare) - const cm2Keys = getKeys(cm2, namespace).sort(Buffer.compare) - - t.alike(cm1Keys, cm2Keys, `Share same ${namespace} cores`) - } -}) - test('project creator auth core has project key', async function (t) { const sqlite = new Sqlite(':memory:') const keyManager = new KeyManager(randomBytes(16)) @@ -167,7 +100,6 @@ test('eagerly updates remote bitfields', async function (t) { .done() await destroyReplication() await cm1Core.clear(0, 2) - { // This is ensuring that bitfields also get propogated in the other // direction, e.g. from the non-writer to the writer @@ -224,62 +156,6 @@ test('eagerly updates remote bitfields', async function (t) { } }) -test('works with an existing protocol stream for replications', async function (t) { - const projectKey = randomBytes(32) - const cm1 = createCoreManager({ projectKey }) - const cm2 = createCoreManager({ projectKey }) - - const n1 = new NoiseSecretStream(true) - const n2 = new NoiseSecretStream(false) - n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) - - const s1 = Hypercore.createProtocolStream(n1) - const s2 = Hypercore.createProtocolStream(n2) - - cm1.replicate(s1) - cm2.replicate(s2) - - await Promise.all([ - waitForCores(cm1, getKeys(cm2, 'auth')), - waitForCores(cm2, getKeys(cm1, 'auth')), - ]) - - const cm1Keys = getKeys(cm1, 'auth').sort(Buffer.compare) - const cm2Keys = getKeys(cm2, 'auth').sort(Buffer.compare) - - t.alike(cm1Keys, cm2Keys, 'Share same auth cores') -}) - -test.skip('can mux other project replications over same stream', async function (t) { - // This test fails because https://github.com/holepunchto/corestore/issues/45 - // The `ondiscoverykey` hook for `Hypercore.createProtocolStream()` that we - // use to know when other cores are muxed in the stream is only called the - // first time the protocol stream is created. When a second core replicates - // to the same stream, it sees it is already a protomux stream, and it does - // not add the notify hook for `ondiscoverykey`. - // We might be able to work around this if we want to enable multi-project - // muxing before the issue is resolved by creating the protomux stream outside - // the core manager, and then somehow hooking into the relevant corestore. - t.plan(2) - const projectKey = randomBytes(32) - const cm1 = createCoreManager({ projectKey }) - const cm2 = createCoreManager({ projectKey }) - const otherProject = createCoreManager() - - const n1 = new NoiseSecretStream(true) - const n2 = new NoiseSecretStream(false) - n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) - - await Promise.all([ - waitForCores(cm1, getKeys(cm2, 'auth')), - waitForCores(cm2, getKeys(cm1, 'auth')), - ]) - - cm1.replicate(n1) - otherProject.replicate(n2) - cm2.replicate(n2) -}) - test('multiplexing waits for cores to be added', async function (t) { // Mapeo code expects replication to work when cores are not added to the // replication stream at the same time. This is not explicitly tested in @@ -326,8 +202,6 @@ test('close()', async (t) => { t.is(core.sessions.length, 0, 'no open sessions') } } - const ns = new NoiseSecretStream(true) - t.exception(() => cm.replicate(ns), /closed/) }) test('Added cores are persisted', async (t) => { @@ -365,14 +239,8 @@ test('encryption', async function (t) { const cm2 = createCoreManager({ projectKey }) const cm3 = createCoreManager({ projectKey, encryptionKeys }) - const { rsm: rsm1 } = replicate(cm1, cm2) - const { rsm: rsm2 } = replicate(cm1, cm3) - - for (const rsm of [...rsm1, ...rsm2]) { - for (const ns of CoreManager.namespaces) { - rsm.enableNamespace(ns) - } - } + replicate(cm1, cm2) + replicate(cm1, cm3) for (const ns of CoreManager.namespaces) { const { core, key } = cm1.getWriterCore(ns) @@ -442,39 +310,6 @@ test('poolSize limits number of open file descriptors', async function (t) { }) }) -async function waitForCores(coreManager, keys) { - const allKeys = getAllKeys(coreManager) - if (hasKeys(keys, allKeys)) return - return new Promise((res) => { - coreManager.on('add-core', function onAddCore({ key }) { - allKeys.push(key) - if (hasKeys(keys, allKeys)) { - coreManager.off('add-core', onAddCore) - res() - } - }) - }) -} - -function getAllKeys(coreManager) { - const keys = [] - for (const namespace of CoreManager.namespaces) { - keys.push.apply(keys, getKeys(coreManager, namespace)) - } - return keys -} - -function getKeys(coreManager, namespace) { - return coreManager.getCores(namespace).map(({ key }) => key) -} - -function hasKeys(someKeys, allKeys) { - for (const key of someKeys) { - if (!allKeys.find((k) => k.equals(key))) return false - } - return true -} - test('sends "haves" bitfields over project creator core replication stream', async function (t) { const projectKey = randomBytes(32) const cm1 = createCoreManager({ projectKey }) @@ -510,8 +345,8 @@ test('sends "haves" bitfields over project creator core replication stream', asy const cm1Core = cm1.getWriterCore('data').core await cm1Core.ready() const batchSize = 4096 - // Create 1 million entries in hypercore - for (let i = 0; i < 2 ** 20; i += batchSize) { + // Create 4 million entries in hypercore - will be at least two have bitfields + for (let i = 0; i < 2 ** 22; i += batchSize) { const data = Array(batchSize) .fill(null) .map(() => 'block') @@ -525,8 +360,8 @@ test('sends "haves" bitfields over project creator core replication stream', asy const n2 = new NoiseSecretStream(false) n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) - cm1.replicate(n1) - cm2.replicate(n2) + cm1[kCoreManagerReplicate](n1) + cm2[kCoreManagerReplicate](n2) // Need to wait for now, since no event for when a remote bitfield is updated await new Promise((res) => setTimeout(res, 200)) @@ -622,40 +457,6 @@ test('unreplicate', async (t) => { }) }) -test('disableNamespace and re-enable', async (t) => { - const projectKey = randomBytes(32) - const cm1 = createCoreManager({ projectKey }) - const cm2 = createCoreManager({ projectKey }) - - const { - rsm: [rsm1, rsm2], - } = replicate(cm1, cm2) - - rsm1.enableNamespace('data') - rsm2.enableNamespace('data') - - await Promise.all([ - waitForCores(cm1, getKeys(cm2, 'data')), - waitForCores(cm2, getKeys(cm1, 'data')), - ]) - - const data1CR = cm1.getWriterCore('data') - await data1CR.core.append(['a', 'b', 'c']) - - const data1ReplicaCore = cm2.getCoreByKey(data1CR.key) - t.is((await data1ReplicaCore.get(2, { timeout: 200 })).toString(), 'c') - - rsm1.disableNamespace('data') - - await data1CR.core.append(['d', 'e', 'f']) - - await t.exception(() => data1ReplicaCore.get(5, { timeout: 200 })) - - rsm1.enableNamespace('data') - - t.is((await data1ReplicaCore.get(5, { timeout: 200 })).toString(), 'f') -}) - const DEBUG = process.env.DEBUG // Compare two bitfields (instance of core.core.bitfield or peer.remoteBitfield) @@ -728,3 +529,17 @@ function latencyStream(delay = 0) { }, }) } + +/** + * + * @param {Hypercore<'binary', any>} core + * @param {import('protomux')} protomux + */ +export function unreplicate(core, protomux) { + const peerToUnreplicate = core.peers.find( + (peer) => peer.protomux === protomux + ) + if (!peerToUnreplicate) return + peerToUnreplicate.channel.close() + return +} diff --git a/tests/data-type.js b/tests/data-type.js index 14940bb28..2194f9cd9 100644 --- a/tests/data-type.js +++ b/tests/data-type.js @@ -84,7 +84,7 @@ test('test validity of `createdBy` field from another peer', async (t) => { const obs = await dt1.create(obsFixture) const driveId = ds1.writerCore.key - const { destroy } = replicateDataStore(cm1, cm2) + const { destroy } = replicate(cm1, cm2) await waitForCores(cm2, [driveId]) const replicatedCore = cm2.getCoreByKey(driveId) await replicatedCore.update({ wait: true }) @@ -135,17 +135,3 @@ async function testenv(opts) { return { coreManager, dataType, dataStore } } - -function replicateDataStore(cm1, cm2) { - const { - rsm: [rsm1, rsm2], - destroy, - } = replicate(cm1, cm2) - - rsm1.enableNamespace('data') - rsm2.enableNamespace('data') - return { - rsm: /** @type {const} */ ([rsm1, rsm2]), - destroy, - } -} diff --git a/tests/fastify-plugins/blobs.js b/tests/fastify-plugins/blobs.js index 752818aef..84b257ddc 100644 --- a/tests/fastify-plugins/blobs.js +++ b/tests/fastify-plugins/blobs.js @@ -9,8 +9,11 @@ import fastify from 'fastify' import { BlobStore } from '../../src/blob-store/index.js' import BlobServerPlugin from '../../src/fastify-plugins/blobs.js' import { projectKeyToPublicId } from '../../src/utils.js' -import { replicateBlobs } from '../helpers/blob-store.js' -import { createCoreManager, waitForCores } from '../helpers/core-manager.js' +import { + createCoreManager, + waitForCores, + replicate, +} from '../helpers/core-manager.js' test('Plugin throws error if missing getBlobStore option', async (t) => { const server = fastify() @@ -224,12 +227,12 @@ test('GET photo returns 404 when trying to get non-replicated blob', async (t) = const [{ blobId }] = data - const { destroy } = replicateBlobs(cm1, cm2) + const { destroy } = replicate(cm1, cm2) await waitForCores(cm2, [cm1.getWriterCore('blobIndex').key]) /** @type {any}*/ - const replicatedCore = cm2.getCoreByDiscoveryKey( + const { core: replicatedCore } = cm2.getCoreByDiscoveryKey( Buffer.from(blobId.driveId, 'hex') ) await replicatedCore.update({ wait: true }) diff --git a/tests/helpers/blob-store.js b/tests/helpers/blob-store.js index 2f49dd305..a56b248c4 100644 --- a/tests/helpers/blob-store.js +++ b/tests/helpers/blob-store.js @@ -1,5 +1,4 @@ // @ts-nocheck -import { replicate } from './core-manager.js' import { pipelinePromise as pipeline, Writable } from 'streamx' import { BlobStore } from '../../src/blob-store/index.js' @@ -16,26 +15,6 @@ export function createBlobStore(options = {}) { return { blobStore, coreManager } } -/** - * - * @param {import('../../src/core-manager/index.js').CoreManager} cm1 - * @param {import('../../src/core-manager/index.js').CoreManager} cm2 - */ -export function replicateBlobs(cm1, cm2) { - const { - rsm: [rsm1, rsm2], - destroy, - } = replicate(cm1, cm2) - rsm1.enableNamespace('blobIndex') - rsm1.enableNamespace('blob') - rsm2.enableNamespace('blobIndex') - rsm2.enableNamespace('blob') - return { - rsm: /** @type {const} */ ([rsm1, rsm2]), - destroy, - } -} - /** * @param {*} rs * @returns {Promise} diff --git a/tests/helpers/core-manager.js b/tests/helpers/core-manager.js index d7b2a5261..a4663fab4 100644 --- a/tests/helpers/core-manager.js +++ b/tests/helpers/core-manager.js @@ -1,11 +1,13 @@ // @ts-nocheck -import { CoreManager } from '../../src/core-manager/index.js' +import { + CoreManager, + kCoreManagerReplicate, +} from '../../src/core-manager/index.js' import Sqlite from 'better-sqlite3' import { randomBytes } from 'crypto' import { KeyManager } from '@mapeo/crypto' import RAM from 'random-access-memory' import NoiseSecretStream from '@hyperswarm/secret-stream' - /** * * @param {Partial[0]> & { rootKey?: Buffer }} param0 @@ -23,6 +25,7 @@ export function createCoreManager({ keyManager, storage: RAM, projectKey, + autoDownload: false, ...opts, }) } @@ -43,8 +46,8 @@ export function replicate( const n2 = new NoiseSecretStream(false, undefined, { keyPair: kp2 }) n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) - const rsm1 = cm1.replicate(n1) - const rsm2 = cm2.replicate(n2) + cm1[kCoreManagerReplicate](n1) + cm2[kCoreManagerReplicate](n2) async function destroy() { await Promise.all([ @@ -60,7 +63,6 @@ export function replicate( } return { - rsm: [rsm1, rsm2], destroy, } } diff --git a/tests/local-peers.js b/tests/local-peers.js index 084b7e8ba..9b151c7e7 100644 --- a/tests/local-peers.js +++ b/tests/local-peers.js @@ -109,7 +109,7 @@ test('Send invite, duplicate connections', async (t) => { t.is(peers3.length, 1) t.ok( peers3[0].connectedAt > peers1[0].connectedAt, - 'later connected peer is not used' + `later connected peer is not used: ${peers3[0].connectedAt} ${peers1[0].connectedAt}` ) { diff --git a/tests/sync/peer-sync-controller.js b/tests/sync/peer-sync-controller.js deleted file mode 100644 index 5d9e2c220..000000000 --- a/tests/sync/peer-sync-controller.js +++ /dev/null @@ -1,188 +0,0 @@ -// @ts-check - -import test from 'brittle' -import Hypercore from 'hypercore' -import { PeerSyncController } from '../../src/sync/peer-sync-controller.js' -import { createCoreManager } from '../helpers/core-manager.js' -import { KeyManager } from '@mapeo/crypto' -import { once } from 'node:events' -import { setTimeout } from 'node:timers/promises' -import { NAMESPACES } from '../../src/core-manager/index.js' -import { SyncState } from '../../src/sync/sync-state.js' -import { - BLOCKED_ROLE_ID, - CREATOR_CAPABILITIES, - DEFAULT_CAPABILITIES, -} from '../../src/capabilities.js' - -test('auth, config and blobIndex enabled by default', async (t) => { - const { - coreManagers: [cm1, cm2], - } = await testenv(CREATOR_CAPABILITIES) - - const preSyncNamespaces = /** @type {const} */ ([ - 'auth', - 'config', - 'blobIndex', - ]) - - const peerAddPromises = [] - for (const ns of preSyncNamespaces) { - peerAddPromises.push( - once(cm1.getWriterCore(ns).core, 'peer-add'), - once(cm1.getWriterCore(ns).core, 'peer-add') - ) - } - await Promise.all(peerAddPromises) - t.pass('pre-sync cores connected') - - // Wait to give other namespaces a chance to connect (they shouldn't) - await setTimeout(500) - - for (const ns of NAMESPACES) { - for (const cm of [cm1, cm2]) { - const nsCores = cm.getCores(ns) - t.is( - nsCores.length, - includes(preSyncNamespaces, ns) ? 2 : 1, - 'preSync namespaces have 2 cores, others have 1' - ) - for (const { core } of nsCores) { - if (includes(preSyncNamespaces, ns)) { - t.is(core.peers.length, 1, 'pre-sync namespace cores have one peer') - } else { - t.is(core.peers.length, 0, 'non-pre-sync cores have no peers') - } - } - } - } -}) - -test('enabling data sync replicates all cores', async (t) => { - const { - coreManagers: [cm1, cm2], - peerSyncControllers: [psc1, psc2], - } = await testenv(CREATOR_CAPABILITIES) - - psc1.enableDataSync() - psc2.enableDataSync() - - const peerAddPromises = [] - for (const ns of NAMESPACES) { - peerAddPromises.push( - once(cm1.getWriterCore(ns).core, 'peer-add'), - once(cm1.getWriterCore(ns).core, 'peer-add') - ) - } - await Promise.all(peerAddPromises) - - for (const ns of NAMESPACES) { - for (const [i, cm] of [cm1, cm2].entries()) { - const nsCores = cm.getCores(ns) - t.is(nsCores.length, 2, `cm${i + 1}: namespace ${ns} has 2 cores now`) - for (const { core } of nsCores) { - t.is( - core.peers.length, - 1, - `cm${i + 1}: ${ns} ${ - core === cm.getWriterCore(ns).core ? 'own' : 'synced' - } core is connected` - ) - } - } - } -}) - -test('no sync capabilities === no namespaces sync apart from auth', async (t) => { - const { - coreManagers: [cm1, cm2], - peerSyncControllers: [psc1, psc2], - } = await testenv(DEFAULT_CAPABILITIES[BLOCKED_ROLE_ID]) - - psc1.enableDataSync() - psc2.enableDataSync() - - // Wait to give cores a chance to connect - await setTimeout(500) - - for (const ns of NAMESPACES) { - for (const cm of [cm1, cm2]) { - const nsCores = cm.getCores(ns) - if (ns === 'auth') { - t.is(nsCores.length, 2, `all auth cores have been shared`) - // no guarantees about sharing of other cores yet - } - for (const { core } of nsCores) { - const isCreatorCore = core === cm.creatorCore - if (isCreatorCore) { - t.is(core.peers.length, 1, 'creator core remains connected') - } else { - t.is(core.peers.length, 0, 'core is disconnected') - } - } - } - } -}) - -/** - * - * @param {import('../../src/capabilities.js').Capability} cap - * @returns - */ -async function testenv(cap) { - const { publicKey: projectKey, secretKey: projectSecretKey } = - KeyManager.generateProjectKeypair() - const cm1 = await createCoreManager({ projectKey, projectSecretKey }) - const cm2 = await createCoreManager({ projectKey }) - - const stream1 = Hypercore.createProtocolStream(true, { - ondiscoverykey: (discoveryKey) => - cm1.handleDiscoveryKey(discoveryKey, stream1), - }) - const stream2 = Hypercore.createProtocolStream(false, { - ondiscoverykey: (discoveryKey) => - cm2.handleDiscoveryKey(discoveryKey, stream2), - }) - stream1.pipe(stream2).pipe(stream1) - - const psc1 = new PeerSyncController({ - protomux: stream1.noiseStream.userData, - coreManager: cm1, - syncState: new SyncState({ coreManager: cm1 }), - // @ts-expect-error - capabilities: { - async getCapabilities() { - return cap - }, - }, - }) - const psc2 = new PeerSyncController({ - protomux: stream2.noiseStream.userData, - coreManager: cm2, - syncState: new SyncState({ coreManager: cm2 }), - // @ts-expect-error - capabilities: { - async getCapabilities() { - return cap - }, - }, - }) - - return { - peerSyncControllers: [psc1, psc2], - coreManagers: [cm1, cm2], - } -} - -/** - * Helper for Typescript array.prototype.includes - * - * @template {U} T - * @template U - * @param {ReadonlyArray} coll - * @param {U} el - * @returns {el is T} - */ -function includes(coll, el) { - return coll.includes(/** @type {T} */ (el)) -} diff --git a/types/corestore.d.ts b/types/corestore.d.ts index a4bd6bfc2..41545c146 100644 --- a/types/corestore.d.ts +++ b/types/corestore.d.ts @@ -27,6 +27,7 @@ declare module 'corestore' { options: Omit & { key?: Buffer | string | undefined keyPair: { publicKey: Buffer; secretKey?: Buffer | undefined | null } + sparse?: boolean } ): Hypercore replicate: typeof Hypercore.prototype.replicate