diff --git a/package-lock.json b/package-lock.json index e326a3d9d..34a791f94 100644 --- a/package-lock.json +++ b/package-lock.json @@ -53,6 +53,7 @@ "devDependencies": { "@bufbuild/buf": "^1.26.1", "@hyperswarm/testnet": "^3.1.2", + "@mapeo/mock-data": "^1.0.1", "@sinonjs/fake-timers": "^10.0.2", "@types/b4a": "^1.6.0", "@types/debug": "^4.1.8", @@ -77,6 +78,7 @@ "prettier": "^2.8.8", "random-access-file": "^4.0.4", "random-access-memory": "^6.2.0", + "random-bytes-readable-stream": "^3.0.0", "rimraf": "^5.0.1", "streamx": "^2.15.1", "tempy": "^3.1.0", @@ -377,6 +379,22 @@ "node": "^12.22.0 || ^14.17.0 || >=16.0.0" } }, + "node_modules/@faker-js/faker": { + "version": "8.3.1", + "resolved": "https://registry.npmjs.org/@faker-js/faker/-/faker-8.3.1.tgz", + "integrity": "sha512-FdgpFxY6V6rLZE9mmIBb9hM0xpfvQOSNOLnzolzKwsE1DH+gC7lEKV1p1IbR0lAYyvYd5a4u3qWJzowUkw1bIw==", + "dev": true, + "funding": [ + { + "type": "opencollective", + "url": "https://opencollective.com/fakerjs" + } + ], + "engines": { + "node": "^14.17.0 || ^16.13.0 || >=18.0.0", + "npm": ">=6.14.13" + } + }, "node_modules/@fastify/ajv-compiler": { "version": "3.5.0", "license": "MIT", @@ -651,6 +669,22 @@ "z32": "^1.0.0" } }, + "node_modules/@mapeo/mock-data": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/@mapeo/mock-data/-/mock-data-1.0.1.tgz", + "integrity": "sha512-Mdmyn3dCjF3wuwuGJhjGjVnydwHoZPgpAyA8JJvZDBNr1qUbeQyq6mrBr+Tg90ZBRY7G9S0wxABD+OJOKZhQdg==", + "dev": true, + "dependencies": { + "@faker-js/faker": "^8.3.1", + "@mapeo/schema": "^3.0.0-next.11", + "json-schema-faker": "^0.5.3", + "type-fest": "^4.8.0" + }, + "bin": { + "generate-mapeo-data": "bin/generate-mapeo-data.js", + "list-mapeo-schemas": "bin/list-mapeo-schemas.js" + } + }, "node_modules/@mapeo/schema": { "version": "3.0.0-next.13", "resolved": "https://registry.npmjs.org/@mapeo/schema/-/schema-3.0.0-next.13.tgz", @@ -1431,6 +1465,12 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/call-me-maybe": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/call-me-maybe/-/call-me-maybe-1.0.2.tgz", + "integrity": "sha512-HpX65o1Hnr9HH25ojC1YGs7HCQLq0GCOibSaWER0eNpgJ/Z1MZv2mTc7+xh6WOPxbRVcmgbv4hGU+uSQ/2xFZQ==", + "dev": true + }, "node_modules/callsites": { "version": "3.1.0", "dev": true, @@ -2565,6 +2605,19 @@ "url": "https://opencollective.com/eslint" } }, + "node_modules/esprima": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/esprima/-/esprima-4.0.1.tgz", + "integrity": "sha512-eGuFFw7Upda+g4p+QHvnW0RyTX/SVeJBDM/gCtMARO0cLuT2HcEKnTPvhjV6aGeqrCB/sbNop0Kszm0jsaWU4A==", + "dev": true, + "bin": { + "esparse": "bin/esparse.js", + "esvalidate": "bin/esvalidate.js" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/esquery": { "version": "1.5.0", "dev": true, @@ -2967,6 +3020,12 @@ "node": ">=8.0.0" } }, + "node_modules/format-util": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/format-util/-/format-util-1.0.5.tgz", + "integrity": "sha512-varLbTj0e0yVyRpqQhuWV+8hlePAgaoFRhNFj50BNjEIrw1/DphHSObtqwskVCPWNgzwPoQrZAbfa/SBiicNeg==", + "dev": true + }, "node_modules/forwarded": { "version": "0.2.0", "license": "MIT", @@ -3987,6 +4046,53 @@ "dev": true, "license": "MIT" }, + "node_modules/json-schema-faker": { + "version": "0.5.3", + "resolved": "https://registry.npmjs.org/json-schema-faker/-/json-schema-faker-0.5.3.tgz", + "integrity": "sha512-BeIrR0+YSrTbAR9dOMnjbFl1MvHyXnq+Wpdw1FpWZDHWKLzK229hZ5huyPcmzFUfVq1ODwf40WdGVoE266UBUg==", + "dev": true, + "dependencies": { + "json-schema-ref-parser": "^6.1.0", + "jsonpath-plus": "^7.2.0" + }, + "bin": { + "jsf": "bin/gen.cjs" + } + }, + "node_modules/json-schema-ref-parser": { + "version": "6.1.0", + "resolved": "https://registry.npmjs.org/json-schema-ref-parser/-/json-schema-ref-parser-6.1.0.tgz", + "integrity": "sha512-pXe9H1m6IgIpXmE5JSb8epilNTGsmTb2iPohAXpOdhqGFbQjNeHHsZxU+C8w6T81GZxSPFLeUoqDJmzxx5IGuw==", + "deprecated": "Please switch to @apidevtools/json-schema-ref-parser", + "dev": true, + "dependencies": { + "call-me-maybe": "^1.0.1", + "js-yaml": "^3.12.1", + "ono": "^4.0.11" + } + }, + "node_modules/json-schema-ref-parser/node_modules/argparse": { + "version": "1.0.10", + "resolved": "https://registry.npmjs.org/argparse/-/argparse-1.0.10.tgz", + "integrity": "sha512-o5Roy6tNG4SL/FOkCAN6RzjiakZS25RLYFrcMttJqbdd8BWrnA+fGz57iN5Pb06pvBGvl5gQ0B48dJlslXvoTg==", + "dev": true, + "dependencies": { + "sprintf-js": "~1.0.2" + } + }, + "node_modules/json-schema-ref-parser/node_modules/js-yaml": { + "version": "3.14.1", + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.14.1.tgz", + "integrity": "sha512-okMH7OXXJ7YrN9Ok3/SXrnu4iX9yOk+25nqX4imS2npuvTYDmo/QEZoqwZkYaIDk3jVvBOTOIEgEhaLOynBS9g==", + "dev": true, + "dependencies": { + "argparse": "^1.0.7", + "esprima": "^4.0.0" + }, + "bin": { + "js-yaml": "bin/js-yaml.js" + } + }, "node_modules/json-schema-traverse": { "version": "0.4.1", "dev": true, @@ -4031,6 +4137,15 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/jsonpath-plus": { + "version": "7.2.0", + "resolved": "https://registry.npmjs.org/jsonpath-plus/-/jsonpath-plus-7.2.0.tgz", + "integrity": "sha512-zBfiUPM5nD0YZSBT/o/fbCUlCcepMIdP0CJZxM1+KgA4f2T206f6VAg9e7mX35+KlMaIc5qXW34f3BnwJ3w+RA==", + "dev": true, + "engines": { + "node": ">=12.0.0" + } + }, "node_modules/junk": { "version": "4.0.1", "dev": true, @@ -5157,6 +5272,15 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/ono": { + "version": "4.0.11", + "resolved": "https://registry.npmjs.org/ono/-/ono-4.0.11.tgz", + "integrity": "sha512-jQ31cORBFE6td25deYeD80wxKBMj+zBmHTrVxnc6CKhx8gho6ipmWM5zj/oeoqioZ99yqBls9Z/9Nss7J26G2g==", + "dev": true, + "dependencies": { + "format-util": "^1.0.3" + } + }, "node_modules/open": { "version": "7.4.2", "resolved": "https://registry.npmjs.org/open/-/open-7.4.2.tgz", @@ -5824,6 +5948,18 @@ "version": "1.0.0", "license": "MIT" }, + "node_modules/random-bytes-readable-stream": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/random-bytes-readable-stream/-/random-bytes-readable-stream-3.0.0.tgz", + "integrity": "sha512-GrDPlkikCTvAOClNgxbbZg2Xq84lmBqhCkkPuDh92vgsDfW9dvrp4kQpE7AOL/3B3j3BATkQocpnCp/bUBn9NQ==", + "dev": true, + "engines": { + "node": "^12.20.0 || ^14.13.1 || >=16.0.0" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/random-bytes-seed": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/random-bytes-seed/-/random-bytes-seed-1.0.3.tgz", @@ -6729,6 +6865,12 @@ "node": ">= 10.x" } }, + "node_modules/sprintf-js": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.0.3.tgz", + "integrity": "sha512-D9cPgkvLlV3t3IzL0D0YLvGA9Ahk4PcvVwUbN0dSGr1aP0Nrt4AEnTUbuGvquEC0mA64Gqt1fzirlRs5ibXx8g==", + "dev": true + }, "node_modules/stackframe": { "version": "1.3.4", "dev": true, @@ -7201,9 +7343,9 @@ } }, "node_modules/type-fest": { - "version": "4.5.0", - "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-4.5.0.tgz", - "integrity": "sha512-diLQivFzddJl4ylL3jxSkEc39Tpw7o1QeEHIPxVwryDK2lpB7Nqhzhuo6v5/Ls08Z0yPSAhsyAWlv1/H0ciNmw==", + "version": "4.8.1", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-4.8.1.tgz", + "integrity": "sha512-ShaaYnjf+0etG8W/FumARKMjjIToy/haCaTjN2dvcewOSoNqCQzdgG7m2JVOlM5qndGTHjkvsrWZs+k/2Z7E0Q==", "engines": { "node": ">=16" }, diff --git a/package.json b/package.json index c642be21f..5374bdd0a 100644 --- a/package.json +++ b/package.json @@ -68,6 +68,7 @@ "devDependencies": { "@bufbuild/buf": "^1.26.1", "@hyperswarm/testnet": "^3.1.2", + "@mapeo/mock-data": "^1.0.1", "@sinonjs/fake-timers": "^10.0.2", "@types/b4a": "^1.6.0", "@types/debug": "^4.1.8", @@ -92,6 +93,7 @@ "prettier": "^2.8.8", "random-access-file": "^4.0.4", "random-access-memory": "^6.2.0", + "random-bytes-readable-stream": "^3.0.0", "rimraf": "^5.0.1", "streamx": "^2.15.1", "tempy": "^3.1.0", diff --git a/src/capabilities.js b/src/capabilities.js index 05d8b4485..8b683c95c 100644 --- a/src/capabilities.js +++ b/src/capabilities.js @@ -147,6 +147,8 @@ export class Capabilities { #projectCreatorAuthCoreId #ownDeviceId + static NO_ROLE_CAPABILITIES = NO_ROLE_CAPABILITIES + /** * * @param {object} opts @@ -209,17 +211,21 @@ export class Capabilities { */ async getAll() { const roles = await this.#dataType.getMany() + /** @type {Record} */ + const capabilities = {} let projectCreatorDeviceId try { projectCreatorDeviceId = await this.#coreOwnership.getOwner( this.#projectCreatorAuthCoreId ) + // Default to creator capabilities, but can be overwritten if a different + // role is set below + capabilities[projectCreatorDeviceId] = CREATOR_CAPABILITIES } catch (e) { // Not found, we don't know who the project creator is so we can't include // them in the returned map } - /** @type {Record} */ - const capabilities = {} + for (const role of roles) { const deviceId = role.docId if (!isKnownRoleId(role.roleId)) continue diff --git a/src/core-manager/index.js b/src/core-manager/index.js index 3f085738d..9168449a1 100644 --- a/src/core-manager/index.js +++ b/src/core-manager/index.js @@ -2,14 +2,15 @@ import { TypedEmitter } from 'tiny-typed-emitter' import Corestore from 'corestore' import assert from 'node:assert' -import { once } from 'node:events' -import Hypercore from 'hypercore' import { HaveExtension, ProjectExtension } from '../generated/extensions.js' import { CoreIndex } from './core-index.js' -import { ReplicationStateMachine } from './replication-state-machine.js' import * as rle from './bitfield-rle.js' import { Logger } from '../logger.js' +import { keyToId } from '../utils.js' +import { discoveryKey } from 'hypercore-crypto' +import Hypercore from 'hypercore' +export const kCoreManagerReplicate = Symbol('replicate core manager') // WARNING: Changing these will break things for existing apps, since namespaces // are used for key derivation export const NAMESPACES = /** @type {const} */ ([ @@ -30,7 +31,6 @@ const CREATE_SQL = `CREATE TABLE IF NOT EXISTS ${TABLE} ( /** @typedef {(typeof NAMESPACES)[number]} Namespace */ /** @typedef {{ core: Core, key: Buffer, namespace: Namespace }} CoreRecord */ /** @typedef {import('streamx').Duplex} DuplexStream */ -/** @typedef {{ rsm: ReplicationStateMachine, stream: DuplexStream, cores: Set }} ReplicationRecord */ /** * @typedef {Object} Events * @property {(coreRecord: CoreRecord) => void} add-core @@ -48,8 +48,6 @@ export class CoreManager extends TypedEmitter { #projectKey #addCoreSqlStmt #encryptionKeys - /** @type {Set} */ - #replications = new Set() #projectExtension /** @type {'opened' | 'closing' | 'closed'} */ #state = 'opened' @@ -57,6 +55,13 @@ export class CoreManager extends TypedEmitter { #haveExtension #deviceId #l + /** + * We use this to reduce network traffic caused by requesting the same key + * from multiple clients. + * TODO: Remove items from this set after a max age + */ + #keyRequests = new TrackedKeyRequests() + #autoDownload static get namespaces() { return NAMESPACES @@ -70,6 +75,7 @@ export class CoreManager extends TypedEmitter { * @param {Buffer} [options.projectSecretKey] 32-byte secret key of the project creator core * @param {Partial>} [options.encryptionKeys] Encryption keys for each namespace * @param {import('hypercore').HypercoreStorage} options.storage Folder to store all hypercore data + * @param {boolean} [options.autoDownload=true] Immediately start downloading cores - should only be set to false for tests * @param {Logger} [options.logger] */ constructor({ @@ -79,6 +85,7 @@ export class CoreManager extends TypedEmitter { projectSecretKey, encryptionKeys = {}, storage, + autoDownload = true, logger, }) { super() @@ -90,11 +97,14 @@ export class CoreManager extends TypedEmitter { !projectSecretKey || projectSecretKey.length === 64, 'project owner core secret key must be 64-byte buffer' ) + // Each peer will attach a listener, so max listeners is max attached peers + this.setMaxListeners(0) this.#l = Logger.create('coreManager', logger) const primaryKey = keyManager.getDerivedKey('primaryKey', projectKey) this.#deviceId = keyManager.getIdentityKeypair().publicKey.toString('hex') this.#projectKey = projectKey this.#encryptionKeys = encryptionKeys + this.#autoDownload = autoDownload // Make sure table exists for persisting known cores sqlite.prepare(CREATE_SQL).run() @@ -161,6 +171,12 @@ export class CoreManager extends TypedEmitter { this.#creatorCore.on('peer-add', (peer) => { this.#sendHaves(peer) }) + this.#creatorCore.on('peer-remove', (peer) => { + // When a peer is removed we clean up any unanswered key requests, so that + // we will request from a different peer, and to avoid the tracking of key + // requests growing without bounds. + this.#keyRequests.deleteByPeerKey(peer.remotePublicKey) + }) this.#ready = Promise.all( [...this.#coreIndex].map(({ core }) => core.ready()) @@ -222,13 +238,13 @@ export class CoreManager extends TypedEmitter { * Get a core by its discovery key * * @param {Buffer} discoveryKey - * @returns {Core | undefined} + * @returns {CoreRecord | undefined} */ getCoreByDiscoveryKey(discoveryKey) { const coreRecord = this.#coreIndex.getByDiscoveryId( discoveryKey.toString('hex') ) - return coreRecord && coreRecord.core + return coreRecord } /** @@ -237,14 +253,11 @@ export class CoreManager extends TypedEmitter { */ async close() { this.#state = 'closing' + this.#keyRequests.clear() const promises = [] for (const { core } of this.#coreIndex) { promises.push(core.close()) } - for (const { stream } of this.#replications) { - promises.push(once(stream, 'close')) - stream.destroy() - } await Promise.all(promises) this.#state = 'closed' } @@ -279,6 +292,11 @@ export class CoreManager extends TypedEmitter { keyPair, encryptionKey: this.#encryptionKeys[namespace], }) + if (namespace !== 'blob' && this.#autoDownload) { + core.download({ start: 0, end: -1 }) + } + // Every peer adds a listener, so could have many peers + core.setMaxListeners(0) // @ts-ignore - ensure key is defined before hypercore is ready core.key = key this.#coreIndex.add({ core, key, namespace, writer }) @@ -308,13 +326,6 @@ export class CoreManager extends TypedEmitter { }) } - for (const { stream, rsm, cores } of this.#replications.values()) { - if (cores.has(core)) continue - if (rsm.state.enabledNamespaces.has(namespace)) { - core.replicate(stream) - } - } - if (persist) { this.#addCoreSqlStmt.run({ publicKey: key, namespace }) } @@ -331,140 +342,46 @@ export class CoreManager extends TypedEmitter { } /** - * Start replicating cores managed by CoreManager to a Noise Secret Stream (as - * created by @hyperswarm/secret-stream). Important: only one CoreManager can - * be replicated to a given stream - attempting to replicate a second - * CoreManager to the same stream will cause sharing of auth core keys to - * fail - see https://github.com/holepunchto/corestore/issues/45 - * - * Initially only cores in the `auth` namespace are replicated to the stream. - * All cores in the `auth` namespace are shared to all peers who have the - * `rootCoreKey` core, and also replicated to the stream + * Send an extension message over the project creator core replication stream + * requesting a core key for the given discovery key. * - * To start replicating other namespaces call `enableNamespace(ns)` on the - * returned state machine - * - * @param {import('../types.js').NoiseStream | import('../types.js').ProtocolStream} noiseStream framed noise secret stream, i.e. @hyperswarm/secret-stream - */ - replicate(noiseStream) { - if (this.#state !== 'opened') throw new Error('Core manager is closed') - if ( - /** @type {import('../types.js').ProtocolStream} */ (noiseStream) - .noiseStream?.userData - ) { - console.warn( - 'Passed an existing protocol stream to coreManager.replicate(). Other corestores and core managers replicated to this stream will no longer automatically inject shared cores into the stream' - ) - } - // @ts-expect-error - too complex to type right now - const stream = Hypercore.createProtocolStream(noiseStream) - const protocol = stream.noiseStream.userData - if (!protocol) throw new Error('Invalid stream') - // If the noise stream already had a protomux instance attached to - // noiseStream.userData, then Hypercore.createProtocolStream does not attach - // the ondiscoverykey listener, so we make sure we are listening for this, - // and that we override any previous notifier that was attached to protomux. - // This means that only one Core Manager instance can currently be - // replicated to a stream if we want sharing of unknown auth cores to work. - protocol.pair( - { protocol: 'hypercore/alpha' }, - /** @param {Buffer} discoveryKey */ async (discoveryKey) => { - this.handleDiscoveryKey(discoveryKey, stream) - } - ) - - /** @type {ReplicationRecord['cores']} */ - const replicatingCores = new Set() - const rsm = new ReplicationStateMachine({ - enableNamespace: (namespace) => { - for (const { core } of this.getCores(namespace)) { - if (replicatingCores.has(core)) continue - core.replicate(protocol) - replicatingCores.add(core) - } - }, - disableNamespace: (namespace) => { - for (const { core } of this.getCores(namespace)) { - if (core === this.creatorCore) continue - unreplicate(core, protocol) - replicatingCores.delete(core) - } - }, - }) - - // Always need to replicate the project creator core - this.creatorCore.replicate(protocol) - replicatingCores.add(this.creatorCore) - - // For now enable auth namespace here, rather than in sync controller - rsm.enableNamespace('auth') - - /** @type {ReplicationRecord} */ - const replicationRecord = { stream, rsm, cores: replicatingCores } - this.#replications.add(replicationRecord) - - stream.once('close', () => { - rsm.disableAll() - rsm.removeAllListeners() - this.#replications.delete(replicationRecord) - }) - - return rsm - } - - /** + * @param {Buffer} peerKey * @param {Buffer} discoveryKey - * @param {any} stream */ - async handleDiscoveryKey(discoveryKey, stream) { - const discoveryId = discoveryKey.toString('hex') - const peer = await this.#findPeer(stream.remotePublicKey) + requestCoreKey(peerKey, discoveryKey) { + // No-op if we already have this core + if (this.getCoreByDiscoveryKey(discoveryKey)) return + const peer = this.#creatorCore.peers.find((peer) => { + return peer.remotePublicKey.equals(peerKey) + }) if (!peer) { - console.warn('handleDiscovery no peer', stream.remotePublicKey) - // TODO: How to handle this and when does it happen? + // This should not happen because this is only called from SyncApi, which + // checks the peer exists before calling this method. + this.#l.log( + 'Attempted to request core key for %h, but no connected peer %h', + discoveryKey, + peerKey + ) return } - // If we already know about this core, then we will add it to the - // replication stream when we are ready - if (this.#coreIndex.getByDiscoveryId(discoveryId)) return + // Only request a key once, e.g. from the peer we first receive it from (we + // can assume that a peer must have the key if we see the discovery key in + // the protomux). This is necessary to reduce network traffic for many newly + // connected peers - otherwise duplicate requests will be sent to every peer + if (this.#keyRequests.has(discoveryKey)) return + this.#keyRequests.set(discoveryKey, peerKey) + + this.#l.log( + 'Requesting core key for discovery key %h from peer %h', + discoveryKey, + peerKey + ) const message = ProjectExtension.fromPartial({ wantCoreKeys: [discoveryKey], }) this.#projectExtension.send(message, peer) } - /** - * @param {Buffer} publicKey - * @param {{ timeout?: number }} [opts] - */ - async #findPeer(publicKey, { timeout = 200 } = {}) { - const creatorCore = this.#creatorCore - const peer = creatorCore.peers.find((peer) => { - return peer.remotePublicKey.equals(publicKey) - }) - if (peer) return peer - // This is called from the from the handleDiscoveryId event, which can - // happen before the peer connection is fully established, so we wait for - // the `peer-add` event, with a timeout in case the peer never gets added - return new Promise(function (res) { - const timeoutId = setTimeout(function () { - creatorCore.off('peer-add', onPeer) - res(null) - }, timeout) - - creatorCore.on('peer-add', onPeer) - - /** @param {any} peer */ - function onPeer(peer) { - if (peer.remotePublicKey.equals(publicKey)) { - clearTimeout(timeoutId) - creatorCore.off('peer-add', onPeer) - res(peer) - } - } - }) - } - /** * @param {ProjectExtension} msg * @param {any} peer @@ -473,8 +390,7 @@ export class CoreManager extends TypedEmitter { const message = ProjectExtension.create() let hasKeys = false for (const discoveryKey of wantCoreKeys) { - const discoveryId = discoveryKey.toString('hex') - const coreRecord = this.#coreIndex.getByDiscoveryId(discoveryId) + const coreRecord = this.getCoreByDiscoveryKey(discoveryKey) if (!coreRecord) continue message[`${coreRecord.namespace}CoreKeys`].push(coreRecord.key) hasKeys = true @@ -486,6 +402,7 @@ export class CoreManager extends TypedEmitter { for (const coreKey of coreKeys[`${namespace}CoreKeys`]) { // Use public method - these must be persisted (private method defaults to persisted=false) this.addCore(coreKey, namespace) + this.#keyRequests.deleteByDiscoveryKey(discoveryKey(coreKey)) } } } @@ -539,6 +456,28 @@ export class CoreManager extends TypedEmitter { peer.protomux.uncork() } + + /** + * ONLY FOR TESTING + * Replicate all cores in core manager + * + * @param {Parameters[0]} stream + * @returns + */ + [kCoreManagerReplicate](stream) { + const protocolStream = Hypercore.createProtocolStream(stream, { + ondiscoverykey: async (discoveryKey) => { + const peer = await findPeer( + this.creatorCore, + // @ts-ignore + protocolStream.noiseStream.remotePublicKey + ) + if (!peer) return + this.requestCoreKey(peer.remotePublicKey, discoveryKey) + }, + }) + return this.#corestore.replicate(stream) + } } /** @@ -585,16 +524,92 @@ const HaveExtensionCodec = { }, } +class TrackedKeyRequests { + /** @type {Map} */ + #byDiscoveryId = new Map() + /** @type {Map>} */ + #byPeerId = new Map() + + /** + * @param {Buffer} discoveryKey + * @param {Buffer} peerKey + */ + set(discoveryKey, peerKey) { + const discoveryId = keyToId(discoveryKey) + const peerId = keyToId(peerKey) + const existingForPeer = this.#byPeerId.get(peerId) || new Set() + this.#byDiscoveryId.set(discoveryId, peerId) + existingForPeer.add(discoveryId) + this.#byPeerId.set(peerId, existingForPeer) + return this + } + /** + * @param {Buffer} discoveryKey + */ + has(discoveryKey) { + const discoveryId = keyToId(discoveryKey) + return this.#byDiscoveryId.has(discoveryId) + } + /** + * @param {Buffer} discoveryKey + */ + deleteByDiscoveryKey(discoveryKey) { + const discoveryId = keyToId(discoveryKey) + const peerId = this.#byDiscoveryId.get(discoveryId) + if (!peerId) return false + this.#byDiscoveryId.delete(discoveryId) + const existingForPeer = this.#byPeerId.get(peerId) + if (existingForPeer) { + existingForPeer.delete(discoveryId) + } + return true + } + /** + * @param {Buffer} peerKey + */ + deleteByPeerKey(peerKey) { + const peerId = keyToId(peerKey) + const existingForPeer = this.#byPeerId.get(peerId) + if (!existingForPeer) return + for (const discoveryId of existingForPeer) { + this.#byDiscoveryId.delete(discoveryId) + } + this.#byPeerId.delete(peerId) + } + clear() { + this.#byDiscoveryId.clear() + this.#byPeerId.clear() + } +} + /** - * - * @param {Hypercore<'binary', any>} core - * @param {import('protomux')} protomux + * @param {Hypercore<"binary", Buffer>} core + * @param {Buffer} publicKey + * @param {{ timeout?: number }} [opts] */ -export function unreplicate(core, protomux) { - const peerToUnreplicate = core.peers.find( - (peer) => peer.protomux === protomux - ) - if (!peerToUnreplicate) return - peerToUnreplicate.channel.close() - return +function findPeer(core, publicKey, { timeout = 200 } = {}) { + const peer = core.peers.find((peer) => { + return peer.remotePublicKey.equals(publicKey) + }) + if (peer) return peer + // This is called from the from the handleDiscoveryId event, which can + // happen before the peer connection is fully established, so we wait for + // the `peer-add` event, with a timeout in case the peer never gets added + return new Promise(function (res) { + const timeoutId = setTimeout(function () { + core.off('peer-add', onPeer) + res(null) + }, timeout) + + core.on('peer-add', onPeer) + + /** @param {any} peer */ + function onPeer(peer) { + if (peer.remotePublicKey.equals(publicKey)) { + clearTimeout(timeoutId) + core.off('peer-add', onPeer) + res(peer) + } + } + }) } diff --git a/src/core-manager/replication-state-machine.js b/src/core-manager/replication-state-machine.js deleted file mode 100644 index a71dfe61d..000000000 --- a/src/core-manager/replication-state-machine.js +++ /dev/null @@ -1,76 +0,0 @@ -import { TypedEmitter } from 'tiny-typed-emitter' - -/** @typedef {import('./index.js').Namespace} Namespace */ -/** @typedef {Set} EnabledNamespaces */ -/** @typedef {{ enabledNamespaces: EnabledNamespaces }} ReplicationState */ - -/** - * @typedef {object} StateMachineEvents - * @property {(state: ReplicationState) => void } state - */ - -/** - * A simple state machine to manage which namespaces are enabled for replication - * - * @extends {TypedEmitter} - */ -export class ReplicationStateMachine extends TypedEmitter { - /** @type {ReplicationState} */ - #state = { - enabledNamespaces: new Set(), - } - #enableNamespace - #disableNamespace - - /** - * - * @param {object} opts - * @param {(namespace: Namespace) => void} opts.enableNamespace - * @param {(namespace: Namespace) => void} opts.disableNamespace - */ - constructor({ enableNamespace, disableNamespace }) { - super() - this.#enableNamespace = enableNamespace - this.#disableNamespace = disableNamespace - } - - get state() { - return this.#state - } - - /** - * Enable a namespace for replication - will add known cores in the namespace - * to the replication stream - * - * @param {Namespace} namespace */ - enableNamespace(namespace) { - if (this.#state.enabledNamespaces.has(namespace)) return - this.#state.enabledNamespaces.add(namespace) - this.#enableNamespace(namespace) - this.emit('state', this.#state) - } - - /** - * Disable a namespace for replication - will remove cores in the namespace - * from the replication stream - * - * @param {Namespace} namespace - */ - disableNamespace(namespace) { - if (!this.#state.enabledNamespaces.has(namespace)) return - this.#state.enabledNamespaces.delete(namespace) - this.#disableNamespace(namespace) - this.emit('state', this.#state) - } - - /** - * @internal - * Should only be called when the stream is closed, because no obvious way to - * implement this otherwise. - */ - disableAll() { - if (!this.#state.enabledNamespaces.size) return - this.#state.enabledNamespaces.clear() - this.emit('state', this.#state) - } -} diff --git a/src/datastore/index.js b/src/datastore/index.js index 4655c0c06..be98ffcf1 100644 --- a/src/datastore/index.js +++ b/src/datastore/index.js @@ -66,6 +66,10 @@ export class DataStore extends TypedEmitter { storage, batch: (entries) => this.#handleEntries(entries), }) + coreManager.on('add-core', (coreRecord) => { + if (coreRecord.namespace !== namespace) return + this.#coreIndexer.addCore(coreRecord.core) + }) // Forward events from coreIndexer this.on('newListener', (eventName, listener) => { @@ -164,9 +168,9 @@ export class DataStore extends TypedEmitter { */ async read(versionId) { const { coreDiscoveryKey, index } = parseVersionId(versionId) - const core = this.#coreManager.getCoreByDiscoveryKey(coreDiscoveryKey) - if (!core) throw new Error('Invalid versionId') - const block = await core.get(index, { wait: false }) + const coreRecord = this.#coreManager.getCoreByDiscoveryKey(coreDiscoveryKey) + if (!coreRecord) throw new Error('Invalid versionId') + const block = await coreRecord.core.get(index, { wait: false }) if (!block) throw new Error('Not Found') return decode(block, { coreDiscoveryKey, index }) } @@ -186,9 +190,9 @@ export class DataStore extends TypedEmitter { /** @param {string} versionId */ async readRaw(versionId) { const { coreDiscoveryKey, index } = parseVersionId(versionId) - const core = this.#coreManager.getCoreByDiscoveryKey(coreDiscoveryKey) - if (!core) throw new Error('core not found') - const block = await core.get(index, { wait: false }) + const coreRecord = this.#coreManager.getCoreByDiscoveryKey(coreDiscoveryKey) + if (!coreRecord) throw new Error('core not found') + const block = await coreRecord.core.get(index, { wait: false }) if (!block) throw new Error('Not Found') return block } diff --git a/src/local-peers.js b/src/local-peers.js index aef72c678..fe3df44e6 100644 --- a/src/local-peers.js +++ b/src/local-peers.js @@ -207,7 +207,7 @@ class Peer { * @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status * @property {(peer: PeerInfoConnected) => void} peer-add Emitted when a new peer is connected * @property {(peerId: string, invite: InviteWithKeys) => void} invite Emitted when an invite is received - * @property {(discoveryKey: Buffer, stream: import('./types.js').ReplicationStream) => void} discovery-key Emitted when a new hypercore is replicated (by a peer) to a peer replication stream (passed as the second parameter) + * @property {(discoveryKey: Buffer, protomux: Protomux) => void} discovery-key Emitted when a new hypercore is replicated (by a peer) to a peer protomux instance (passed as the second parameter) */ /** @extends {TypedEmitter} */ @@ -347,7 +347,7 @@ export class LocalPeers extends TypedEmitter { discoveryKey, stream.noiseStream.remotePublicKey ) - this.emit('discovery-key', discoveryKey, outerStream) + this.emit('discovery-key', discoveryKey, protomux) } ) diff --git a/src/logger.js b/src/logger.js index 356e6cc35..80de7b9db 100644 --- a/src/logger.js +++ b/src/logger.js @@ -1,23 +1,51 @@ import createDebug from 'debug' import { discoveryKey } from 'hypercore-crypto' +import mapObject from 'map-obj' +import util from 'util' const TRIM = 7 -createDebug.formatters.h = (v) => { +createDebug.formatters.h = function (v) { if (!Buffer.isBuffer(v)) return '[undefined]' return v.toString('hex').slice(0, TRIM) } -createDebug.formatters.S = (v) => { +createDebug.formatters.S = function (v) { if (typeof v !== 'string') return '[undefined]' return v.slice(0, 7) } -createDebug.formatters.k = (v) => { +createDebug.formatters.k = function (v) { if (!Buffer.isBuffer(v)) return '[undefined]' return discoveryKey(v).toString('hex').slice(0, TRIM) } +/** + * @param {import('./sync/sync-state.js').State} v + * @this {any} */ +createDebug.formatters.X = function (v) { + try { + const mapped = mapObject(v, (k, v) => [ + k, + // @ts-ignore - type checks here don't get us anything + mapObject(v, (k, v) => { + if (k === 'remoteStates') + // @ts-ignore - type checks here don't get us anything + return [k, mapObject(v, (k, v) => [k.slice(0, 7), v])] + return [k, v] + }), + ]) + return util.inspect(mapped, { + colors: true, + depth: 10, + compact: 6, + breakLength: 90, + }) + } catch (e) { + return `[ERROR: $(e.message)]` + } +} + const counts = new Map() export class Logger { diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index d90e78d12..660ba2717 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -30,6 +30,8 @@ import { LocalPeers } from './local-peers.js' import { InviteApi } from './invite-api.js' import { MediaServer } from './media-server.js' import { LocalDiscovery } from './discovery/local-discovery.js' +import { Capabilities } from './capabilities.js' +import NoiseSecretStream from '@hyperswarm/secret-stream' import { Logger } from './logger.js' /** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */ @@ -74,6 +76,7 @@ export class MapeoManager extends TypedEmitter { #invite #mediaServer #localDiscovery + #loggerBase #l /** @@ -96,7 +99,8 @@ export class MapeoManager extends TypedEmitter { super() this.#keyManager = new KeyManager(rootKey) this.#deviceId = getDeviceId(this.#keyManager) - this.#l = new Logger({ deviceId: this.#deviceId, ns: 'manager' }) + const logger = (this.#loggerBase = new Logger({ deviceId: this.#deviceId })) + this.#l = Logger.create('manager', logger) this.#dbFolder = dbFolder this.#projectMigrationsFolder = projectMigrationsFolder const sqlite = new Database( @@ -107,15 +111,20 @@ export class MapeoManager extends TypedEmitter { this.#db = drizzle(sqlite) migrate(this.#db, { migrationsFolder: clientMigrationsFolder }) - this.#localPeers = new LocalPeers({ logger: this.#l }) + this.#localPeers = new LocalPeers({ logger }) this.#localPeers.on('peers', (peers) => { this.emit('local-peers', omitPeerProtomux(peers)) }) + this.#localPeers.on('discovery-key', (dk) => { + if (this.#activeProjects.size === 0) { + this.#l.log('Received dk %h but no active projects', dk) + } + }) this.#projectSettingsIndexWriter = new IndexWriter({ tables: [projectSettingsTable], sqlite, - logger: this.#l, + logger, }) this.#activeProjects = new Map() @@ -152,9 +161,9 @@ export class MapeoManager extends TypedEmitter { this.#localDiscovery = new LocalDiscovery({ identityKeypair: this.#keyManager.getIdentityKeypair(), - logger: this.#l, + logger, }) - this.#localDiscovery.on('connection', this[kManagerReplicate].bind(this)) + this.#localDiscovery.on('connection', this.#replicate.bind(this)) } /** @@ -169,15 +178,25 @@ export class MapeoManager extends TypedEmitter { } /** - * Replicate Mapeo to a `@hyperswarm/secret-stream`. This replication connects - * the Mapeo RPC channel and allows invites. All active projects will sync - * automatically to this replication stream. Only use for local (trusted) - * connections, because the RPC channel key is public. To sync a specific - * project without connecting RPC, use project[kProjectReplication]. + * Create a Mapeo replication stream. This replication connects the Mapeo RPC + * channel and allows invites. All active projects will sync automatically to + * this replication stream. Only use for local (trusted) connections, because + * the RPC channel key is public. To sync a specific project without + * connecting RPC, use project[kProjectReplication]. * - * @param {import('@hyperswarm/secret-stream')} noiseStream + * @param {boolean} isInitiator */ - [kManagerReplicate](noiseStream) { + [kManagerReplicate](isInitiator) { + const noiseStream = new NoiseSecretStream(isInitiator, undefined, { + keyPair: this.#keyManager.getIdentityKeypair(), + }) + return this.#replicate(noiseStream) + } + + /** + * @param {NoiseSecretStream} noiseStream + */ + #replicate(noiseStream) { const replicationStream = this.#localPeers.connect(noiseStream) Promise.all([this.getDeviceInfo(), openedNoiseSecretStream(noiseStream)]) .then(([{ name }, openedNoiseStream]) => { @@ -367,7 +386,7 @@ export class MapeoManager extends TypedEmitter { sharedDb: this.#db, sharedIndexWriter: this.#projectSettingsIndexWriter, localPeers: this.#localPeers, - logger: this.#l, + logger: this.#loggerBase, getMediaBaseUrl: this.#mediaServer.getMediaAddress.bind( this.#mediaServer ), @@ -426,10 +445,18 @@ export class MapeoManager extends TypedEmitter { } /** + * Add a project to this device. After adding a project the client should + * await `project.$waitForInitialSync()` to ensure that the device has + * downloaded their proof of project membership and the project config. + * * @param {import('./generated/rpc.js').Invite} invite + * @param {{ waitForSync?: boolean }} [opts] For internal use in tests, set opts.waitForSync = false to not wait for sync during addProject() * @returns {Promise} */ - async addProject({ projectKey, encryptionKeys, projectInfo }) { + async addProject( + { projectKey, encryptionKeys, projectInfo }, + { waitForSync = true } = {} + ) { const projectPublicId = projectKeyToPublicId(projectKey) // 1. Check for an active project @@ -453,10 +480,9 @@ export class MapeoManager extends TypedEmitter { throw new Error(`Project with ID ${projectPublicId} already exists`) } - // TODO: Relies on completion of https://github.com/digidem/mapeo-core-next/issues/233 - // 3. Sync auth + config cores + // No awaits here - need to update table in same tick as the projectExists check - // 4. Update the project keys table + // 3. Update the project keys table this.#saveToProjectKeysTable({ projectId, projectPublicId, @@ -467,17 +493,108 @@ export class MapeoManager extends TypedEmitter { projectInfo, }) - // 5. Write device info into project - const deviceInfo = await this.getDeviceInfo() - - if (deviceInfo.name) { + // Any errors from here we need to remove project from db because it has not + // been fully added and synced + try { + // 4. Write device info into project const project = await this.getProject(projectPublicId) - await project[kSetOwnDeviceInfo]({ name: deviceInfo.name }) + await project.ready() + + try { + const deviceInfo = await this.getDeviceInfo() + if (deviceInfo.name) { + await project[kSetOwnDeviceInfo]({ name: deviceInfo.name }) + } + } catch (e) { + // Can ignore an error trying to write device info + this.#l.log( + 'ERROR: failed to write project %h deviceInfo %o', + projectKey, + e + ) + } + + // 5. Wait for initial project sync + if (waitForSync) { + await this.#waitForInitialSync(project) + } + + this.#activeProjects.set(projectPublicId, project) + } catch (e) { + this.#l.log('ERROR: could not add project', e) + this.#db + .delete(projectKeysTable) + .where(eq(projectKeysTable.projectId, projectId)) + .run() + throw e } - + this.#l.log('Added project %h, public ID: %S', projectKey, projectPublicId) return projectPublicId } + /** + * Sync initial data: the `auth` cores which contain the capability messages, + * and the `config` cores which contain the project name & custom config (if + * it exists). The API consumer should await this after `client.addProject()` + * to ensure that the device is fully added to the project. + * + * @param {MapeoProject} project + * @param {object} [opts] + * @param {number} [opts.timeoutMs=5000] Timeout in milliseconds for max time + * to wait between sync status updates before giving up. As long as syncing is + * happening, this will never timeout, but if more than timeoutMs passes + * without any sync activity, then this will resolve `false` e.g. data has not + * synced + * @returns {Promise} + */ + async #waitForInitialSync(project, { timeoutMs = 5000 } = {}) { + await project.ready() + const [capability, projectSettings] = await Promise.all([ + project.$getOwnCapabilities(), + project.$getProjectSettings(), + ]) + const { + auth: { localState: authState }, + config: { localState: configState }, + } = project.$sync.getState() + const isCapabilitySynced = capability !== Capabilities.NO_ROLE_CAPABILITIES + const isProjectSettingsSynced = + projectSettings !== MapeoProject.EMPTY_PROJECT_SETTINGS + // Assumes every project that someone is invited to has at least one record + // in the auth store - the capability record for the invited device + const isAuthSynced = authState.want === 0 && authState.have > 0 + // Assumes every project that someone is invited to has at least one record + // in the config store - defining the name of the project. + // TODO: Enforce adding a project name in the invite method + const isConfigSynced = configState.want === 0 && configState.have > 0 + if ( + isCapabilitySynced && + isProjectSettingsSynced && + isAuthSynced && + isConfigSynced + ) { + return true + } + return new Promise((resolve, reject) => { + /** @param {import('./sync/sync-state.js').State} syncState */ + const onSyncState = (syncState) => { + clearTimeout(timeoutId) + if (syncState.auth.dataToSync || syncState.config.dataToSync) { + timeoutId = setTimeout(onTimeout, timeoutMs) + return + } + project.$sync.off('sync-state', onSyncState) + resolve(this.#waitForInitialSync(project, { timeoutMs })) + } + const onTimeout = () => { + project.$sync.off('sync-state', onSyncState) + reject(new Error('Sync timeout')) + } + let timeoutId = setTimeout(onTimeout, timeoutMs) + project.$sync.on('sync-state', onSyncState) + }) + } + /** * @template {import('type-fest').Exact} T * @param {T} deviceInfo @@ -501,6 +618,7 @@ export class MapeoManager extends TypedEmitter { await project[kSetOwnDeviceInfo](deviceInfo) }) ) + this.#l.log('set device info %o', deviceInfo) } /** @@ -533,6 +651,15 @@ export class MapeoManager extends TypedEmitter { await this.#mediaServer.stop() } + async startLocalPeerDiscovery() { + return this.#localDiscovery.start() + } + + /** @type {LocalDiscovery['stop']} */ + async stopLocalPeerDiscovery(opts) { + return this.#localDiscovery.stop(opts) + } + /** * @returns {Promise} */ diff --git a/src/mapeo-project.js b/src/mapeo-project.js index 5d5852254..e3ada1a75 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -36,20 +36,21 @@ import { valueOf, } from './utils.js' import { MemberApi } from './member-api.js' -import { IconApi } from './icon-api.js' -import { SyncApi, kSyncReplicate } from './sync/sync-api.js' -import Hypercore from 'hypercore' +import { SyncApi, kHandleDiscoveryKey } from './sync/sync-api.js' import { Logger } from './logger.js' +import { IconApi } from './icon-api.js' /** @typedef {Omit} EditableProjectSettings */ const CORESTORE_STORAGE_FOLDER_NAME = 'corestore' const INDEXER_STORAGE_FOLDER_NAME = 'indexer' +export const kCoreManager = Symbol('coreManager') export const kCoreOwnership = Symbol('coreOwnership') export const kCapabilities = Symbol('capabilities') export const kSetOwnDeviceInfo = Symbol('kSetOwnDeviceInfo') export const kBlobStore = Symbol('blobStore') export const kProjectReplicate = Symbol('replicate project') +const EMPTY_PROJECT_SETTINGS = Object.freeze({}) export class MapeoProject { #projectId @@ -66,6 +67,8 @@ export class MapeoProject { #syncApi #l + static EMPTY_PROJECT_SETTINGS = EMPTY_PROJECT_SETTINGS + /** * @param {Object} opts * @param {string} opts.dbPath Path to store project sqlite db. Use `:memory:` for memory storage @@ -278,26 +281,25 @@ export class MapeoProject { logger: this.#l, }) - ///////// 4. Wire up sync + ///////// 4. Replicate local peers automatically // Replicate already connected local peers for (const peer of localPeers.peers) { if (peer.status !== 'connected') continue - this.#syncApi[kSyncReplicate](peer.protomux) + this.#coreManager.creatorCore.replicate(peer.protomux) } - localPeers.on('discovery-key', (discoveryKey, stream) => { - // The core identified by this discovery key might not be part of this - // project, but we can't know that so we will request it from the peer if - // we don't have it. The peer will not return the core key unless it _is_ - // part of this project - this.#coreManager.handleDiscoveryKey(discoveryKey, stream) - }) - // When a new peer is found, try to replicate (if it is not a member of the // project it will fail the capability check and be ignored) localPeers.on('peer-add', (peer) => { - this.#syncApi[kSyncReplicate](peer.protomux) + this.#coreManager.creatorCore.replicate(peer.protomux) + }) + + // This happens whenever a peer replicates a core to the stream. SyncApi + // handles replicating this core if we also have it, or requesting the key + // for the core. + localPeers.on('discovery-key', (discoveryKey, stream) => { + this.#syncApi[kHandleDiscoveryKey](discoveryKey, stream) }) ///////// 5. Write core ownership record @@ -324,6 +326,13 @@ export class MapeoProject { this.#l.log('Created project instance %h', projectKey) } + /** + * CoreManager instance, used for tests + */ + get [kCoreManager]() { + return this.#coreManager + } + /** * CoreOwnership instance, used for tests */ @@ -447,7 +456,7 @@ export class MapeoProject { ) } catch (e) { this.#l.log('No project settings') - return /** @type {EditableProjectSettings} */ ({}) + return /** @type {EditableProjectSettings} */ (EMPTY_PROJECT_SETTINGS) } } @@ -461,18 +470,21 @@ export class MapeoProject { * and only this project will replicate (to replicate multiple projects you * need to replicate the manager instance via manager[kManagerReplicate]) * - * @param {Exclude[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance + * @param {Parameters[0]} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance * @returns */ [kProjectReplicate](stream) { - const replicationStream = Hypercore.createProtocolStream(stream, { + // @ts-expect-error - hypercore types need updating + const replicationStream = this.#coreManager.creatorCore.replicate(stream, { + // @ts-ignore - hypercore types do not currently include this option ondiscoverykey: async (discoveryKey) => { - this.#coreManager.handleDiscoveryKey(discoveryKey, replicationStream) + const protomux = + /** @type {import('protomux')} */ ( + replicationStream.noiseStream.userData + ) + this.#syncApi[kHandleDiscoveryKey](discoveryKey, protomux) }, }) - const protomux = replicationStream.noiseStream.userData - // @ts-ignore - got fed up jumping through hoops to keep TS heppy - this.#syncApi[kSyncReplicate](protomux) return replicationStream } diff --git a/src/sync/core-sync-state.js b/src/sync/core-sync-state.js index ec382656f..6872dd53c 100644 --- a/src/sync/core-sync-state.js +++ b/src/sync/core-sync-state.js @@ -93,10 +93,13 @@ export class CoreSyncState { if (this.#core) return this.#core = core - this.#localState.setHavesBitfield( - // @ts-ignore - internal property - core?.core?.bitfield - ) + + this.#core.ready().then(() => { + this.#localState.setHavesBitfield( + // @ts-ignore - internal property + core?.core?.bitfield + ) + }) for (const peer of this.#core.peers) { this.#onPeerAdd(peer) diff --git a/src/sync/namespace-sync-state.js b/src/sync/namespace-sync-state.js index 3ffd6769f..ff2dec969 100644 --- a/src/sync/namespace-sync-state.js +++ b/src/sync/namespace-sync-state.js @@ -2,7 +2,7 @@ import { CoreSyncState } from './core-sync-state.js' import { discoveryKey } from 'hypercore-crypto' /** - * @typedef {Omit} SyncState + * @typedef {Omit & { dataToSync: boolean, coreCount: number }} SyncState */ /** @@ -11,6 +11,7 @@ import { discoveryKey } from 'hypercore-crypto' export class NamespaceSyncState { /** @type {Map} */ #coreStates = new Map() + #coreCount = 0 #handleUpdate #namespace /** @type {SyncState | null} */ @@ -55,6 +56,8 @@ export class NamespaceSyncState { if (this.#cachedState) return this.#cachedState /** @type {SyncState} */ const state = { + dataToSync: false, + coreCount: this.#coreCount, localState: createState(), remoteStates: {}, } @@ -71,6 +74,9 @@ export class NamespaceSyncState { } } } + if (state.localState.want > 0 || state.localState.wanted > 0) { + state.dataToSync = true + } this.#cachedState = state return state } @@ -82,6 +88,7 @@ export class NamespaceSyncState { #addCore(core, coreKey) { const discoveryId = discoveryKey(coreKey).toString('hex') this.#getCoreState(discoveryId).attachCore(core) + this.#coreCount++ } /** diff --git a/src/sync/peer-sync-controller.js b/src/sync/peer-sync-controller.js index 306525b8e..d1d55aa09 100644 --- a/src/sync/peer-sync-controller.js +++ b/src/sync/peer-sync-controller.js @@ -10,7 +10,7 @@ import { Logger } from '../logger.js' */ /** @type {Namespace[]} */ -const PRESYNC_NAMESPACES = ['auth', 'config', 'blobIndex'] +export const PRESYNC_NAMESPACES = ['auth', 'config', 'blobIndex'] export class PeerSyncController { #replicatingCores = new Set() @@ -34,7 +34,7 @@ export class PeerSyncController { /** * @param {object} opts - * @param {import("protomux")} opts.protomux + * @param {import("protomux")} opts.protomux * @param {import("../core-manager/index.js").CoreManager} opts.coreManager * @param {import("./sync-state.js").SyncState} opts.syncState * @param {import("../capabilities.js").Capabilities} opts.capabilities @@ -55,8 +55,7 @@ export class PeerSyncController { this.#capabilities = capabilities // Always need to replicate the project creator core - coreManager.creatorCore.replicate(protomux) - this.#replicatingCores.add(coreManager.creatorCore) + this.#replicateCore(coreManager.creatorCore) coreManager.on('add-core', this.#handleAddCore) syncState.on('state', this.#handleStateChange) @@ -69,7 +68,11 @@ export class PeerSyncController { } get peerId() { - return this.peerKey?.toString('hex') + return this.peerKey.toString('hex') + } + + get syncCapability() { + return this.#syncCapability } /** @@ -91,6 +94,31 @@ export class PeerSyncController { this.#updateEnabledNamespaces() } + /** + * @param {Buffer} discoveryKey + */ + handleDiscoveryKey(discoveryKey) { + const coreRecord = this.#coreManager.getCoreByDiscoveryKey(discoveryKey) + // If we already know about this core, then we will add it to the + // replication stream when we are ready + if (coreRecord) { + this.#log( + 'Received discovery key %h, but already have core in namespace %s', + discoveryKey, + coreRecord.namespace + ) + if (this.#enabledNamespaces.has(coreRecord.namespace)) { + this.#replicateCore(coreRecord.core) + } + return + } + if (!this.peerKey) { + this.#log('Unexpected null peerKey') + return + } + this.#coreManager.requestCoreKey(this.peerKey, discoveryKey) + } + /** * Handler for 'core-add' event from CoreManager * Bound to `this` (defined as static property) @@ -113,13 +141,12 @@ export class PeerSyncController { // connected. We shouldn't get a state change before the noise stream has // connected, but if we do we can ignore it because we won't have any useful // information until it connects. - if (!this.#protomux.stream.remotePublicKey) return - const peerId = this.#protomux.stream.remotePublicKey.toString('hex') - this.#syncStatus = getSyncStatus(peerId, state) + if (!this.peerId) return + this.#syncStatus = getSyncStatus(this.peerId, state) const localState = mapObject(state, (ns, nsState) => { return [ns, nsState.localState] }) - this.#log('state %O', state) + this.#log('state %X', state) // Map of which namespaces have received new data since last sync change const didUpdate = mapObject(state, (ns) => { @@ -139,12 +166,12 @@ export class PeerSyncController { if (didUpdate.auth) { try { - const cap = await this.#capabilities.getCapabilities(peerId) + const cap = await this.#capabilities.getCapabilities(this.peerId) this.#syncCapability = cap.sync } catch (e) { this.#log('Error reading capability', e) - // Any error, consider sync blocked - this.#syncCapability = createNamespaceMap('blocked') + // Any error, consider sync unknown + this.#syncCapability = createNamespaceMap('unknown') } } this.#log('capability %o', this.#syncCapability) @@ -197,7 +224,12 @@ export class PeerSyncController { */ #replicateCore(core) { if (this.#replicatingCores.has(core)) return + this.#log('replicating core %k', core.key) core.replicate(this.#protomux) + core.on('peer-remove', (peer) => { + if (!peer.remotePublicKey.equals(this.peerKey)) return + this.#log('peer-remove %h from core %k', peer.remotePublicKey, core.key) + }) this.#replicatingCores.add(core) } @@ -215,32 +247,13 @@ export class PeerSyncController { this.#replicatingCores.delete(core) } - /** - * @param {import('hypercore')<'binary', any>} core - */ - #downloadCore(core) { - if (this.#downloadingRanges.has(core)) return - const range = core.download({ start: 0, end: -1 }) - this.#downloadingRanges.set(core, range) - } - - /** - * @param {import('hypercore')<'binary', any>} core - */ - #undownloadCore(core) { - const range = this.#downloadingRanges.get(core) - if (!range) return - range.destroy() - this.#downloadingRanges.delete(core) - } - /** * @param {Namespace} namespace */ #enableNamespace(namespace) { + if (this.#enabledNamespaces.has(namespace)) return for (const { core } of this.#coreManager.getCores(namespace)) { this.#replicateCore(core) - this.#downloadCore(core) } this.#enabledNamespaces.add(namespace) this.#log('enabled namespace %s', namespace) @@ -250,9 +263,9 @@ export class PeerSyncController { * @param {Namespace} namespace */ #disableNamespace(namespace) { + if (!this.#enabledNamespaces.has(namespace)) return for (const { core } of this.#coreManager.getCores(namespace)) { this.#unreplicateCore(core) - this.#undownloadCore(core) } this.#enabledNamespaces.delete(namespace) this.#log('disabled namespace %s', namespace) diff --git a/src/sync/sync-api.js b/src/sync/sync-api.js index 5eaf685db..018de6e9b 100644 --- a/src/sync/sync-api.js +++ b/src/sync/sync-api.js @@ -1,9 +1,14 @@ import { TypedEmitter } from 'tiny-typed-emitter' import { SyncState } from './sync-state.js' -import { PeerSyncController } from './peer-sync-controller.js' +import { + PeerSyncController, + PRESYNC_NAMESPACES, +} from './peer-sync-controller.js' import { Logger } from '../logger.js' +import { NAMESPACES } from '../core-manager/index.js' +import { keyToId } from '../utils.js' -export const kSyncReplicate = Symbol('replicate sync') +export const kHandleDiscoveryKey = Symbol('handle discovery key') /** * @typedef {object} SyncEvents @@ -19,8 +24,12 @@ export class SyncApi extends TypedEmitter { #capabilities /** @type {Map} */ #peerSyncControllers = new Map() + /** @type {Set} */ + #peerIds = new Set() /** @type {Set<'local' | 'remote'>} */ #dataSyncEnabled = new Set() + /** @type {Map>} */ + #pendingDiscoveryKeys = new Map() #l /** @@ -37,7 +46,39 @@ export class SyncApi extends TypedEmitter { this.#coreManager = coreManager this.#capabilities = capabilities this.syncState = new SyncState({ coreManager, throttleMs }) + this.syncState.setMaxListeners(0) this.syncState.on('state', this.emit.bind(this, 'sync-state')) + + this.#coreManager.creatorCore.on('peer-add', this.#handlePeerAdd) + this.#coreManager.creatorCore.on('peer-remove', this.#handlePeerRemove) + } + + /** @type {import('../local-peers.js').LocalPeersEvents['discovery-key']} */ + [kHandleDiscoveryKey](discoveryKey, protomux) { + const peerSyncController = this.#peerSyncControllers.get(protomux) + if (peerSyncController) { + peerSyncController.handleDiscoveryKey(discoveryKey) + return + } + // We will reach here if we are not part of the project, so we can ignore + // these keys. However it's also possible to reach here when we are part of + // a project, but the creator core `peer-add` event has not yet fired, so we + // queue this to be handled in `#handlePeerAdd` + const peerQueue = this.#pendingDiscoveryKeys.get(protomux) || new Set() + peerQueue.add(discoveryKey) + this.#pendingDiscoveryKeys.set(protomux, peerQueue) + + // If we _are_ part of the project, the `peer-add` should happen very soon + // after we get a discovery-key event, so we cleanup our queue to avoid + // memory leaks for any discovery keys that have not been handled. + setTimeout(() => { + const peerQueue = this.#pendingDiscoveryKeys.get(protomux) + if (!peerQueue) return + peerQueue.delete(discoveryKey) + if (peerQueue.size === 0) { + this.#pendingDiscoveryKeys.delete(protomux) + } + }, 500) } getState() { @@ -69,9 +110,33 @@ export class SyncApi extends TypedEmitter { } /** - * @param {import('protomux')} protomux A protomux instance + * @param {'initial' | 'full'} type + */ + async waitForSync(type) { + const state = this.getState() + const namespaces = type === 'initial' ? PRESYNC_NAMESPACES : NAMESPACES + if (isSynced(state, namespaces, this.#peerSyncControllers)) return + return new Promise((res) => { + this.on('sync-state', function onState(state) { + if (!isSynced(state, namespaces, this.#peerSyncControllers)) return + this.off('sync-state', onState) + res(null) + }) + }) + } + + /** + * Bound to `this` + * + * This will be called whenever a peer is successfully added to the creator + * core, which means that the peer has the project key. The PeerSyncController + * will then handle validation of role records to ensure that the peer is + * actually still part of the project. + * + * @param {{ protomux: import('protomux') }} peer */ - [kSyncReplicate](protomux) { + #handlePeerAdd = (peer) => { + const { protomux } = peer if (this.#peerSyncControllers.has(protomux)) { this.#l.log( 'Unexpected existing peer sync controller for peer %h', @@ -79,7 +144,6 @@ export class SyncApi extends TypedEmitter { ) return } - const peerSyncController = new PeerSyncController({ protomux, coreManager: this.#coreManager, @@ -87,9 +151,61 @@ export class SyncApi extends TypedEmitter { capabilities: this.#capabilities, logger: this.#l, }) + this.#peerSyncControllers.set(protomux, peerSyncController) + if (peerSyncController.peerId) this.#peerIds.add(peerSyncController.peerId) + if (this.#dataSyncEnabled.has('local')) { peerSyncController.enableDataSync() } - this.#peerSyncControllers.set(protomux, peerSyncController) + + const peerQueue = this.#pendingDiscoveryKeys.get(protomux) + if (peerQueue) { + for (const discoveryKey of peerQueue) { + peerSyncController.handleDiscoveryKey(discoveryKey) + } + this.#pendingDiscoveryKeys.delete(protomux) + } + } + + /** + * Bound to `this` + * + * Called when a peer is removed from the creator core, e.g. when the + * connection is terminated. + * + * @param {{ protomux: import('protomux'), remotePublicKey: Buffer }} peer + */ + #handlePeerRemove = (peer) => { + const { protomux } = peer + if (!this.#peerSyncControllers.has(protomux)) { + this.#l.log( + 'Unexpected no existing peer sync controller for peer %h', + protomux.stream.remotePublicKey + ) + return + } + this.#peerSyncControllers.delete(protomux) + this.#peerIds.delete(keyToId(peer.remotePublicKey)) + this.#pendingDiscoveryKeys.delete(protomux) + } +} + +/** + * Is the sync state "synced", e.g. is there nothing left to sync + * + * @param {import('./sync-state.js').State} state + * @param {readonly import('../core-manager/index.js').Namespace[]} namespaces + * @param {Map} peerSyncControllers + */ +function isSynced(state, namespaces, peerSyncControllers) { + for (const ns of namespaces) { + if (state[ns].dataToSync) return false + for (const psc of peerSyncControllers.values()) { + const { peerId } = psc + if (psc.syncCapability[ns] === 'blocked') continue + if (!(peerId in state[ns].remoteStates)) return false + if (state[ns].remoteStates[peerId].status === 'connecting') return false + } } + return true } diff --git a/src/utils.js b/src/utils.js index 07fc064ea..1da1b0d5d 100644 --- a/src/utils.js +++ b/src/utils.js @@ -85,7 +85,7 @@ export function deNullify(obj) { } /** - * @template {import('@mapeo/schema').MapeoDoc & { forks: string[] }} T + * @template {import('@mapeo/schema').MapeoDoc & { forks?: string[] }} T * @param {T} doc * @returns {Omit} */ diff --git a/test-e2e/capabilities.js b/test-e2e/capabilities.js index 2acdf3f7a..984ff049c 100644 --- a/test-e2e/capabilities.js +++ b/test-e2e/capabilities.js @@ -58,10 +58,13 @@ test('New device without capabilities', async (t) => { coreStorage: () => new RAM(), }) - const projectId = await manager.addProject({ - projectKey: randomBytes(32), - encryptionKeys: { auth: randomBytes(32) }, - }) + const projectId = await manager.addProject( + { + projectKey: randomBytes(32), + encryptionKeys: { auth: randomBytes(32) }, + }, + { waitForSync: false } + ) const project = await manager.getProject(projectId) await project.ready() @@ -136,10 +139,13 @@ test('getMany() - on newly invited device before sync', async (t) => { coreStorage: () => new RAM(), }) - const projectId = await manager.addProject({ - projectKey: randomBytes(32), - encryptionKeys: { auth: randomBytes(32) }, - }) + const projectId = await manager.addProject( + { + projectKey: randomBytes(32), + encryptionKeys: { auth: randomBytes(32) }, + }, + { waitForSync: false } + ) const project = await manager.getProject(projectId) await project.ready() diff --git a/test-e2e/device-info.js b/test-e2e/device-info.js index eb3860562..0ac3db0c4 100644 --- a/test-e2e/device-info.js +++ b/test-e2e/device-info.js @@ -64,10 +64,13 @@ test('device info written to projects', (t) => { await manager.setDeviceInfo({ name: 'mapeo' }) - const projectId = await manager.addProject({ - projectKey: randomBytes(32), - encryptionKeys: { auth: randomBytes(32) }, - }) + const projectId = await manager.addProject( + { + projectKey: randomBytes(32), + encryptionKeys: { auth: randomBytes(32) }, + }, + { waitForSync: false } + ) const project = await manager.getProject(projectId) diff --git a/test-e2e/manager-basic.js b/test-e2e/manager-basic.js index 274fd5ed3..9fee8537a 100644 --- a/test-e2e/manager-basic.js +++ b/test-e2e/manager-basic.js @@ -122,17 +122,23 @@ test('Managing added projects', async (t) => { coreStorage: () => new RAM(), }) - const project1Id = await manager.addProject({ - projectKey: KeyManager.generateProjectKeypair().publicKey, - encryptionKeys: { auth: randomBytes(32) }, - projectInfo: { name: 'project 1' }, - }) + const project1Id = await manager.addProject( + { + projectKey: KeyManager.generateProjectKeypair().publicKey, + encryptionKeys: { auth: randomBytes(32) }, + projectInfo: { name: 'project 1' }, + }, + { waitForSync: false } + ) - const project2Id = await manager.addProject({ - projectKey: KeyManager.generateProjectKeypair().publicKey, - encryptionKeys: { auth: randomBytes(32) }, - projectInfo: { name: 'project 2' }, - }) + const project2Id = await manager.addProject( + { + projectKey: KeyManager.generateProjectKeypair().publicKey, + encryptionKeys: { auth: randomBytes(32) }, + projectInfo: { name: 'project 2' }, + }, + { waitForSync: false } + ) t.test('initial information from listed projects', async (st) => { const listedProjects = await manager.listProjects() @@ -194,11 +200,14 @@ test('Managing both created and added projects', async (t) => { name: 'created project', }) - const addedProjectId = await manager.addProject({ - projectKey: KeyManager.generateProjectKeypair().publicKey, - encryptionKeys: { auth: randomBytes(32) }, - projectInfo: { name: 'added project' }, - }) + const addedProjectId = await manager.addProject( + { + projectKey: KeyManager.generateProjectKeypair().publicKey, + encryptionKeys: { auth: randomBytes(32) }, + projectInfo: { name: 'added project' }, + }, + { waitForSync: false } + ) const listedProjects = await manager.listProjects() @@ -263,11 +272,14 @@ test('Consistent storage folders', async (t) => { }) for (let i = 0; i < 10; i++) { - const projectId = await manager.addProject({ - projectKey: randomBytesSeed('test' + i), - encryptionKeys: { auth: randomBytes(32) }, - projectInfo: {}, - }) + const projectId = await manager.addProject( + { + projectKey: randomBytesSeed('test' + i), + encryptionKeys: { auth: randomBytes(32) }, + projectInfo: {}, + }, + { waitForSync: false } + ) await manager.getProject(projectId) } diff --git a/test-e2e/manager-invite.js b/test-e2e/manager-invite.js index 1b92df406..83750410b 100644 --- a/test-e2e/manager-invite.js +++ b/test-e2e/manager-invite.js @@ -1,179 +1,181 @@ -import { test } from 'brittle' -import { KeyManager } from '@mapeo/crypto' -import pDefer from 'p-defer' -import RAM from 'random-access-memory' -import { MEMBER_ROLE_ID } from '../src/capabilities.js' +import { test, skip } from 'brittle' +import { COORDINATOR_ROLE_ID, MEMBER_ROLE_ID } from '../src/capabilities.js' import { InviteResponse_Decision } from '../src/generated/rpc.js' -import { MapeoManager, kRPC } from '../src/mapeo-manager.js' -import { replicate } from '../tests/helpers/local-peers.js' - -const projectMigrationsFolder = new URL('../drizzle/project', import.meta.url) - .pathname -const clientMigrationsFolder = new URL('../drizzle/client', import.meta.url) - .pathname +import { once } from 'node:events' +import { + connectPeers, + createManagers, + disconnectPeers, + waitForPeers, +} from './utils.js' test('member invite accepted', async (t) => { - t.plan(10) - - const deferred = pDefer() - - const creator = new MapeoManager({ - rootKey: KeyManager.generateRootKey(), - projectMigrationsFolder, - clientMigrationsFolder, - dbFolder: ':memory:', - coreStorage: () => new RAM(), - }) - - await creator.setDeviceInfo({ name: 'Creator' }) + const [creator, joiner] = await createManagers(2) + connectPeers([creator, joiner]) + await waitForPeers([creator, joiner]) const createdProjectId = await creator.createProject({ name: 'Mapeo' }) const creatorProject = await creator.getProject(createdProjectId) - creator[kRPC].on('peers', async (peers) => { - t.is(peers.length, 1) - - const response = await creatorProject.$member.invite(peers[0].deviceId, { - roleId: MEMBER_ROLE_ID, - }) - - t.is(response, InviteResponse_Decision.ACCEPT) - deferred.resolve() - }) - - /** @type {string | undefined} */ - let expectedInvitorPeerId - - const joiner = new MapeoManager({ - rootKey: KeyManager.generateRootKey(), - projectMigrationsFolder, - clientMigrationsFolder, - dbFolder: ':memory:', - coreStorage: () => new RAM(), - }) - - await joiner.setDeviceInfo({ name: 'Joiner' }) - - t.exception( + await t.exception( async () => joiner.getProject(createdProjectId), 'joiner cannot get project instance before being invited and added to project' ) - joiner[kRPC].on('peers', (peers) => { - t.is(peers.length, 1) - expectedInvitorPeerId = peers[0].deviceId + const responsePromise = creatorProject.$member.invite(joiner.deviceId, { + roleId: MEMBER_ROLE_ID, }) + const [invite] = await once(joiner.invite, 'invite-received') + t.is(invite.projectId, createdProjectId, 'projectId of invite matches') + t.is(invite.peerId, creator.deviceId, 'deviceId of invite matches') + t.is(invite.projectName, 'Mapeo', 'project name of invite matches') - joiner.invite.on('invite-received', async (invite) => { - t.is(invite.projectId, createdProjectId) - t.is(invite.peerId, expectedInvitorPeerId) - t.is(invite.projectName, 'Mapeo') - // TODO: Check role being invited for (needs https://github.com/digidem/mapeo-core-next/issues/275) + await joiner.invite.accept(invite.projectId) - await joiner.invite.accept(invite.projectId) - }) - - replicate(creator[kRPC], joiner[kRPC]) - - await deferred.promise + t.is( + await responsePromise, + InviteResponse_Decision.ACCEPT, + 'correct invite response' + ) /// After invite flow has completed... - const joinerListedProjects = await joiner.listProjects() - - t.is(joinerListedProjects.length, 1, 'project added to joiner') t.alike( - joinerListedProjects[0], - { - name: 'Mapeo', - projectId: createdProjectId, - createdAt: undefined, - updatedAt: undefined, - }, + await joiner.listProjects(), + await creator.listProjects(), 'project info recorded in joiner successfully' ) - const joinerProject = await joiner.getProject( - joinerListedProjects[0].projectId - ) + const joinerProject = await joiner.getProject(createdProjectId) - t.ok(joinerProject, 'can create joiner project instance') + t.alike( + await joinerProject.$getProjectSettings(), + await creatorProject.$getProjectSettings(), + 'Project settings match' + ) - // TODO: Get project settings of joiner and ensure they're similar to creator's project's settings - // const joinerProjectSettings = await joinerProject.$getProjectSettings() - // t.alike(joinerProjectSettings, { defaultPresets: undefined, name: 'Mapeo' }) + t.alike( + await creatorProject.$member.getMany(), + await joinerProject.$member.getMany(), + 'Project members match' + ) - // TODO: Get members of creator project and assert info matches joiner - // const creatorProjectMembers = await creatorProject.$member.getMany() - // t.is(creatorProjectMembers.length, 1) - // t.alike(creatorProjectMembers[0], await joiner.getDeviceInfo()) + await disconnectPeers([creator, joiner]) }) -test('member invite rejected', async (t) => { - t.plan(9) +test('chain of invites', async (t) => { + const managers = await createManagers(4) + const [creator, ...joiners] = managers + connectPeers(managers) + await waitForPeers(managers) + + const createdProjectId = await creator.createProject({ name: 'Mapeo' }) - const deferred = pDefer() + let invitor = creator + for (const joiner of joiners) { + const invitorProject = await invitor.getProject(createdProjectId) + const responsePromise = invitorProject.$member.invite(joiner.deviceId, { + roleId: COORDINATOR_ROLE_ID, + }) + const [invite] = await once(joiner.invite, 'invite-received') + await joiner.invite.accept(invite.projectId) + t.is( + await responsePromise, + InviteResponse_Decision.ACCEPT, + 'correct invite response' + ) + } - const creator = new MapeoManager({ - rootKey: KeyManager.generateRootKey(), - projectMigrationsFolder, - clientMigrationsFolder, - dbFolder: ':memory:', - coreStorage: () => new RAM(), - }) + /// After invite flow has completed... + + const creatorProject = await creator.getProject(createdProjectId) + const expectedProjectSettings = await creatorProject.$getProjectSettings() + const expectedMembers = await creatorProject.$member.getMany() + + for (const joiner of joiners) { + const joinerProject = await joiner.getProject(createdProjectId) + + t.alike( + await joinerProject.$getProjectSettings(), + expectedProjectSettings, + 'Project settings match' + ) + + const joinerMembers = await joinerProject.$member.getMany() + t.alike( + joinerMembers.sort(memberSort), + expectedMembers.sort(memberSort), + 'Project members match' + ) + } + + await disconnectPeers(managers) +}) - await creator.setDeviceInfo({ name: 'Creator' }) +// TODO: Needs fix to inviteApi to check capabilities before sending invite +skip("member can't invite", async (t) => { + const managers = await createManagers(3) + const [creator, member, joiner] = managers + connectPeers(managers) + await waitForPeers(managers) const createdProjectId = await creator.createProject({ name: 'Mapeo' }) const creatorProject = await creator.getProject(createdProjectId) - creator[kRPC].on('peers', async (peers) => { - t.is(peers.length, 1) + const responsePromise = creatorProject.$member.invite(member.deviceId, { + roleId: MEMBER_ROLE_ID, + }) + const [invite] = await once(member.invite, 'invite-received') + await member.invite.accept(invite.projectId) + await responsePromise - const response = await creatorProject.$member.invite(peers[0].deviceId, { - roleId: MEMBER_ROLE_ID, - }) + /// After invite flow has completed... - t.is(response, InviteResponse_Decision.REJECT) + const memberProject = await member.getProject(createdProjectId) - deferred.resolve() - }) + t.alike( + await memberProject.$getProjectSettings(), + await creatorProject.$getProjectSettings(), + 'Project settings match' + ) - /** @type {string | undefined} */ - let expectedInvitorPeerId + const exceptionPromise = t.exception(() => + memberProject.$member.invite(joiner.deviceId, { roleId: MEMBER_ROLE_ID }) + ) + joiner.invite.once('invite-received', () => t.fail('should not send invite')) + await exceptionPromise - const joiner = new MapeoManager({ - rootKey: KeyManager.generateRootKey(), - projectMigrationsFolder, - clientMigrationsFolder, - dbFolder: ':memory:', - coreStorage: () => new RAM(), - }) + await disconnectPeers(managers) +}) - await joiner.setDeviceInfo({ name: 'Joiner' }) +test('member invite rejected', async (t) => { + const [creator, joiner] = await createManagers(2) + connectPeers([creator, joiner]) + await waitForPeers([creator, joiner]) - t.exception( + const createdProjectId = await creator.createProject({ name: 'Mapeo' }) + const creatorProject = await creator.getProject(createdProjectId) + + await t.exception( async () => joiner.getProject(createdProjectId), 'joiner cannot get project instance before being invited and added to project' ) - joiner[kRPC].on('peers', (peers) => { - t.is(peers.length, 1) - expectedInvitorPeerId = peers[0].deviceId + const responsePromise = creatorProject.$member.invite(joiner.deviceId, { + roleId: MEMBER_ROLE_ID, }) + const [invite] = await once(joiner.invite, 'invite-received') + t.is(invite.projectId, createdProjectId, 'projectId of invite matches') + t.is(invite.peerId, creator.deviceId, 'deviceId of invite matches') + t.is(invite.projectName, 'Mapeo', 'project name of invite matches') - joiner.invite.on('invite-received', async (invite) => { - t.is(invite.projectId, createdProjectId) - t.is(invite.peerId, expectedInvitorPeerId) - t.is(invite.projectName, 'Mapeo') - // TODO: Check role being invited for (needs https://github.com/digidem/mapeo-core-next/issues/275) + await joiner.invite.reject(invite.projectId) - await joiner.invite.reject(invite.projectId) - }) - - replicate(creator[kRPC], joiner[kRPC]) - - await deferred.promise + t.is( + await responsePromise, + InviteResponse_Decision.REJECT, + 'correct invite response' + ) /// After invite flow has completed... @@ -186,7 +188,21 @@ test('member invite rejected', async (t) => { 'joiner cannot get project instance' ) - // TODO: Get members of creator project and assert joiner not added - // const creatorProjectMembers = await creatorProject.$member.getMany() - // t.is(creatorProjectMembers.length, 0) + t.is( + (await creatorProject.$member.getMany()).length, + 1, + 'Only 1 member in project still' + ) + + await disconnectPeers([creator, joiner]) }) + +/** + * @param {import('../src/member-api.js').MemberInfo} a + * @param {import('../src/member-api.js').MemberInfo} b + */ +function memberSort(a, b) { + if (a.deviceId < b.deviceId) return -1 + if (a.deviceId > b.deviceId) return 1 + return 0 +} diff --git a/test-e2e/members.js b/test-e2e/members.js index 2d4b6b653..2934a531a 100644 --- a/test-e2e/members.js +++ b/test-e2e/members.js @@ -1,27 +1,25 @@ +// @ts-check import { test } from 'brittle' -import RAM from 'random-access-memory' -import { KeyManager } from '@mapeo/crypto' -import pDefer from 'p-defer' import { randomBytes } from 'crypto' -import { MapeoManager, kRPC } from '../src/mapeo-manager.js' import { CREATOR_CAPABILITIES, DEFAULT_CAPABILITIES, MEMBER_ROLE_ID, NO_ROLE_CAPABILITIES, } from '../src/capabilities.js' -import { replicate } from '../tests/helpers/local-peers.js' - -const projectMigrationsFolder = new URL('../drizzle/project', import.meta.url) - .pathname -const clientMigrationsFolder = new URL('../drizzle/client', import.meta.url) - .pathname +import { + connectPeers, + createManagers, + disconnectPeers, + invite, + waitForPeers, +} from './utils.js' test('getting yourself after creating project', async (t) => { - const { manager } = setup() + const [manager] = await createManagers(1) - await manager.setDeviceInfo({ name: 'mapeo' }) + const deviceInfo = await manager.getDeviceInfo() const project = await manager.getProject(await manager.createProject()) await project.ready() @@ -31,7 +29,7 @@ test('getting yourself after creating project', async (t) => { me, { deviceId: project.deviceId, - name: 'mapeo', + name: deviceInfo.name, capabilities: CREATOR_CAPABILITIES, }, 'has expected member info with creator capabilities' @@ -44,22 +42,25 @@ test('getting yourself after creating project', async (t) => { members[0], { deviceId: project.deviceId, - name: 'mapeo', + name: deviceInfo.name, capabilities: CREATOR_CAPABILITIES, }, 'has expected member info with creator capabilities' ) }) -test('getting yourself after being invited to project (but not yet synced)', async (t) => { - const { manager } = setup() +test('getting yourself after adding project (but not yet synced)', async (t) => { + const [manager] = await createManagers(1) - await manager.setDeviceInfo({ name: 'mapeo' }) + const deviceInfo = await manager.getDeviceInfo() const project = await manager.getProject( - await manager.addProject({ - projectKey: randomBytes(32), - encryptionKeys: { auth: randomBytes(32) }, - }) + await manager.addProject( + { + projectKey: randomBytes(32), + encryptionKeys: { auth: randomBytes(32) }, + }, + { waitForSync: false } + ) ) await project.ready() @@ -69,7 +70,7 @@ test('getting yourself after being invited to project (but not yet synced)', asy me, { deviceId: project.deviceId, - name: 'mapeo', + name: deviceInfo.name, capabilities: NO_ROLE_CAPABILITIES, }, 'has expected member info with no role capabilities' @@ -82,7 +83,7 @@ test('getting yourself after being invited to project (but not yet synced)', asy members[0], { deviceId: project.deviceId, - name: 'mapeo', + name: deviceInfo.name, capabilities: NO_ROLE_CAPABILITIES, }, 'has expected member info with no role capabilities' @@ -90,19 +91,24 @@ test('getting yourself after being invited to project (but not yet synced)', asy }) test('getting invited member after invite rejected', async (t) => { - const { manager, simulateMemberInvite } = setup() + const managers = await createManagers(2) + const [invitor, invitee] = managers + connectPeers(managers) + await waitForPeers(managers) - await manager.setDeviceInfo({ name: 'mapeo' }) - const project = await manager.getProject(await manager.createProject()) + const projectId = await invitor.createProject() + const project = await invitor.getProject(projectId) await project.ready() - const invitedDeviceId = await simulateMemberInvite(project, 'reject', { - deviceInfo: { name: 'member' }, - roleId: MEMBER_ROLE_ID, + await invite({ + invitor, + projectId, + invitees: [invitee], + reject: true, }) await t.exception( - () => project.$member.getById(invitedDeviceId), + () => project.$member.getById(invitee.deviceId), 'invited member cannot be retrieved' ) @@ -110,107 +116,46 @@ test('getting invited member after invite rejected', async (t) => { t.is(members.length, 1) t.absent( - members.find((m) => m.deviceId === invitedDeviceId), + members.find((m) => m.deviceId === invitee.deviceId), 'invited member not found' ) + await disconnectPeers(managers) }) test('getting invited member after invite accepted', async (t) => { - const { manager, simulateMemberInvite } = setup() - - await manager.setDeviceInfo({ name: 'mapeo' }) - const project = await manager.getProject(await manager.createProject()) + const managers = await createManagers(2) + const [invitor, invitee] = managers + connectPeers(managers) + await waitForPeers(managers) + + const { name: inviteeName } = await invitee.getDeviceInfo() + const projectId = await invitor.createProject() + const project = await invitor.getProject(projectId) await project.ready() - const invitedDeviceId = await simulateMemberInvite(project, 'accept', { - deviceInfo: { name: 'member' }, + await invite({ + invitor, + projectId, + invitees: [invitee], roleId: MEMBER_ROLE_ID, }) - // Before syncing - { - const invitedMember = await project.$member.getById(invitedDeviceId) - - t.alike( - invitedMember, - { - deviceId: invitedDeviceId, - capabilities: DEFAULT_CAPABILITIES[MEMBER_ROLE_ID], - }, - 'has expected member info with member capabilities' - ) - } - - { - const members = await project.$member.getMany() + const members = await project.$member.getMany() - t.is(members.length, 2) + t.is(members.length, 2) - const invitedMember = members.find((m) => m.deviceId === invitedDeviceId) + const invitedMember = members.find((m) => m.deviceId === invitee.deviceId) - t.alike( - invitedMember, - { - deviceId: invitedDeviceId, - capabilities: DEFAULT_CAPABILITIES[MEMBER_ROLE_ID], - }, - 'has expected member info with member capabilities' - ) - } + t.alike( + invitedMember, + { + deviceId: invitee.deviceId, + name: inviteeName, + capabilities: DEFAULT_CAPABILITIES[MEMBER_ROLE_ID], + }, + 'has expected member info with member capabilities' + ) // TODO: Test that device info of invited member can be read from invitor after syncing + await disconnectPeers(managers) }) - -function setup() { - const manager = new MapeoManager({ - rootKey: KeyManager.generateRootKey(), - projectMigrationsFolder, - clientMigrationsFolder, - dbFolder: ':memory:', - coreStorage: () => new RAM(), - }) - - /** - * - * @param {import('../src/mapeo-project.js').MapeoProject} project - * @param {'accept' | 'reject'} respondWith - * @param {{ deviceInfo: import('../src/generated/rpc.js').DeviceInfo, roleId: import('../src/capabilities.js').RoleId }} mocked - * - */ - async function simulateMemberInvite( - project, - respondWith, - { deviceInfo, roleId } - ) { - /** @type {import('p-defer').DeferredPromise} */ - const deferred = pDefer() - - const otherManager = new MapeoManager({ - rootKey: KeyManager.generateRootKey(), - projectMigrationsFolder, - clientMigrationsFolder, - dbFolder: ':memory:', - coreStorage: () => new RAM(), - }) - - await otherManager.setDeviceInfo(deviceInfo) - - otherManager.invite.on('invite-received', ({ projectId }) => { - otherManager.invite[respondWith](projectId).catch(deferred.reject) - }) - - manager[kRPC].on('peers', (peers) => { - const deviceId = peers[0].deviceId - project.$member - .invite(deviceId, { roleId }) - .then(() => deferred.resolve(deviceId)) - .catch(deferred.reject) - }) - - replicate(manager[kRPC], otherManager[kRPC]) - - return deferred.promise - } - - return { manager, simulateMemberInvite } -} diff --git a/test-e2e/project-crud.js b/test-e2e/project-crud.js index 53ae1b2bf..6c25708ae 100644 --- a/test-e2e/project-crud.js +++ b/test-e2e/project-crud.js @@ -4,6 +4,8 @@ import { KeyManager } from '@mapeo/crypto' import { valueOf } from '../src/utils.js' import { MapeoManager } from '../src/mapeo-manager.js' import RAM from 'random-access-memory' +import { stripUndef } from './utils.js' +import { round } from './utils.js' /** @satisfies {Array} */ const fixtures = [ @@ -130,20 +132,3 @@ test('CRUD operations', async (t) => { }) } }) - -/** - * Remove undefined properties from an object, to allow deep comparison - * @param {object} obj - */ -function stripUndef(obj) { - return JSON.parse(JSON.stringify(obj)) -} - -/** - * - * @param {number} value - * @param {number} decimalPlaces - */ -function round(value, decimalPlaces) { - return Math.round(value * 10 ** decimalPlaces) / 10 ** decimalPlaces -} diff --git a/test-e2e/sync.js b/test-e2e/sync.js new file mode 100644 index 000000000..20abb7e58 --- /dev/null +++ b/test-e2e/sync.js @@ -0,0 +1,256 @@ +import { test } from 'brittle' +import { + connectPeers, + createManagers, + invite, + seedDatabases, + sortById, + waitForSync, +} from './utils.js' +import { kCoreManager } from '../src/mapeo-project.js' +import { getKeys } from '../tests/helpers/core-manager.js' +import { NAMESPACES } from '../src/core-manager/index.js' +import { PRESYNC_NAMESPACES } from '../src/sync/peer-sync-controller.js' +import { generate } from '@mapeo/mock-data' +import { valueOf } from '../src/utils.js' +import pTimeout from 'p-timeout' +import { BLOCKED_ROLE_ID, COORDINATOR_ROLE_ID } from '../src/capabilities.js' + +const SCHEMAS_INITIAL_SYNC = ['preset', 'field'] + +test('Create and sync data', async function (t) { + const COUNT = 5 + const managers = await createManagers(COUNT) + const [invitor, ...invitees] = managers + const disconnect = connectPeers(managers, { discovery: false }) + const projectId = await invitor.createProject() + await invite({ invitor, invitees, projectId }) + await disconnect() + + const projects = await Promise.all( + managers.map((m) => m.getProject(projectId)) + ) + + const generatedDocs = (await seedDatabases(projects)).flat() + t.pass(`Generated ${generatedDocs.length} values`) + const generatedSchemaNames = generatedDocs.reduce((acc, cur) => { + acc.add(cur.schemaName) + return acc + }, new Set()) + + connectPeers(managers, { discovery: false }) + await waitForSync(projects, 'initial') + + for (const schemaName of generatedSchemaNames) { + for (const project of projects) { + const deviceId = project.deviceId.slice(0, 7) + // @ts-ignore - to complex to narrow `schemaName` to valid values + const docs = await project[schemaName].getMany() + const expected = generatedDocs.filter((v) => v.schemaName === schemaName) + if (SCHEMAS_INITIAL_SYNC.includes(schemaName)) { + t.alike( + sortById(docs), + sortById(expected), + `All ${schemaName} docs synced to ${deviceId}` + ) + } else { + t.not( + docs.length, + expected.length, + `Not all ${schemaName} docs synced to ${deviceId}` + ) + } + } + } + + for (const project of projects) { + project.$sync.start() + } + + await waitForSync(projects, 'full') + + for (const schemaName of generatedSchemaNames) { + for (const project of projects) { + const deviceId = project.deviceId.slice(0, 7) + // @ts-ignore - to complex to narrow `schemaName` to valid values + const docs = await project[schemaName].getMany() + const expected = generatedDocs.filter((v) => v.schemaName === schemaName) + t.alike( + sortById(docs), + sortById(expected), + `All ${schemaName} docs synced to ${deviceId}` + ) + } + } +}) + +test('start and stop sync', async function (t) { + // Checks that both peers need to start syncing for data to sync, and that + // $sync.stop() actually stops data syncing + const COUNT = 2 + const managers = await createManagers(COUNT) + const [invitor, ...invitees] = managers + const disconnect = connectPeers(managers, { discovery: false }) + const projectId = await invitor.createProject() + await invite({ invitor, invitees, projectId }) + + const projects = await Promise.all( + managers.map((m) => m.getProject(projectId)) + ) + const [invitorProject, inviteeProject] = projects + + const obs1 = await invitorProject.observation.create( + valueOf(generate('observation')[0]) + ) + await waitForSync(projects, 'initial') + inviteeProject.$sync.start() + + await t.exception( + () => pTimeout(waitForSync(projects, 'full'), { milliseconds: 1000 }), + 'wait for sync times out' + ) + + await t.exception( + () => inviteeProject.observation.getByDocId(obs1.docId), + 'before both peers have started sync, doc does not sync' + ) + + invitorProject.$sync.start() + + // Use the same timeout as above, to check that it would have synced given the timeout + await pTimeout(waitForSync(projects, 'full'), { milliseconds: 1000 }) + + const obs1Synced = await inviteeProject.observation.getByDocId(obs1.docId) + + t.alike(obs1Synced, obs1, 'observation is synced') + + inviteeProject.$sync.stop() + + const obs2 = await inviteeProject.observation.create( + valueOf(generate('observation')[0]) + ) + await waitForSync(projects, 'initial') + + await t.exception( + () => pTimeout(waitForSync(projects, 'full'), { milliseconds: 1000 }), + 'wait for sync times out' + ) + + await t.exception( + () => invitorProject.observation.getByDocId(obs2.docId), + 'after stopping sync, data does not sync' + ) + + inviteeProject.$sync.start() + + await pTimeout(waitForSync(projects, 'full'), { milliseconds: 1000 }) + + const obs2Synced = await invitorProject.observation.getByDocId(obs2.docId) + + t.alike(obs2Synced, obs2, 'observation is synced') + + await disconnect() +}) + +test('shares cores', async function (t) { + const COUNT = 5 + const managers = await createManagers(COUNT) + const [invitor, ...invitees] = managers + connectPeers(managers, { discovery: false }) + const projectId = await invitor.createProject() + await invite({ invitor, invitees, projectId }) + + const projects = await Promise.all( + managers.map((m) => m.getProject(projectId)) + ) + const coreManagers = projects.map((p) => p[kCoreManager]) + + await waitForSync(projects, 'initial') + + for (const ns of PRESYNC_NAMESPACES) { + for (const cm of coreManagers) { + const keyCount = getKeys(cm, ns).length + t.is(keyCount, COUNT, 'expected number of cores') + } + } + + // Currently need to start syncing to share other keys - this might change if + // we add keys based on coreOwnership records + for (const project of projects) { + project.$sync.start() + } + + await waitForSync(projects, 'full') + + for (const ns of NAMESPACES) { + for (const cm of coreManagers) { + const keyCount = getKeys(cm, ns).length + t.is(keyCount, COUNT, 'expected number of cores') + } + } +}) + +test('no sync capabilities === no namespaces sync apart from auth', async (t) => { + const COUNT = 3 + const managers = await createManagers(COUNT) + const [invitor, invitee, blocked] = managers + const disconnect1 = connectPeers(managers, { discovery: false }) + const projectId = await invitor.createProject() + await invite({ + invitor, + invitees: [blocked], + projectId, + roleId: BLOCKED_ROLE_ID, + }) + await invite({ + invitor, + invitees: [invitee], + projectId, + roleId: COORDINATOR_ROLE_ID, + }) + + const projects = await Promise.all( + managers.map((m) => m.getProject(projectId)) + ) + const [invitorProject, inviteeProject] = projects + + const generatedDocs = (await seedDatabases([inviteeProject])).flat() + const configDocsCount = generatedDocs.filter( + (doc) => doc.schemaName !== 'observation' + ).length + const dataDocsCount = generatedDocs.length - configDocsCount + + for (const project of projects) { + project.$sync.start() + } + + await waitForSync([inviteeProject, invitorProject], 'full') + + const [invitorState, inviteeState, blockedState] = projects.map((p) => + p.$sync.getState() + ) + + t.is(invitorState.config.localState.have, configDocsCount + COUNT) // count device info doc for each invited device + t.is(invitorState.data.localState.have, dataDocsCount) + t.is(blockedState.config.localState.have, 1) // just the device info doc + t.is(blockedState.data.localState.have, 0) // no data docs synced + + for (const ns of NAMESPACES) { + if (ns === 'auth') { + t.is(invitorState[ns].coreCount, 3) + t.is(inviteeState[ns].coreCount, 3) + t.is(blockedState[ns].coreCount, 3) + } else if (PRESYNC_NAMESPACES.includes(ns)) { + t.is(invitorState[ns].coreCount, 3) + t.is(inviteeState[ns].coreCount, 3) + t.is(blockedState[ns].coreCount, 1) + } else { + t.is(invitorState[ns].coreCount, 2) + t.is(inviteeState[ns].coreCount, 2) + t.is(blockedState[ns].coreCount, 1) + } + t.alike(invitorState[ns].localState, inviteeState[ns].localState) + } + + await disconnect1() +}) diff --git a/test-e2e/utils.js b/test-e2e/utils.js index 04734e8f7..6c9e8bc98 100644 --- a/test-e2e/utils.js +++ b/test-e2e/utils.js @@ -1,3 +1,305 @@ +// @ts-check +import sodium from 'sodium-universal' +import RAM from 'random-access-memory' + +import { MapeoManager } from '../src/index.js' +import { kManagerReplicate, kRPC } from '../src/mapeo-manager.js' +import { MEMBER_ROLE_ID } from '../src/capabilities.js' +import { once } from 'node:events' +import { generate } from '@mapeo/mock-data' +import { valueOf } from '../src/utils.js' +import { randomInt } from 'node:crypto' + +const projectMigrationsFolder = new URL('../drizzle/project', import.meta.url) + .pathname +const clientMigrationsFolder = new URL('../drizzle/client', import.meta.url) + .pathname + +/** + * @param {readonly MapeoManager[]} managers + */ +export async function disconnectPeers(managers) { + return Promise.all( + managers.map(async (manager) => { + return manager.stopLocalPeerDiscovery({ force: true }) + }) + ) +} + +/** + * @param {readonly MapeoManager[]} managers + */ +export function connectPeers(managers, { discovery = true } = {}) { + if (discovery) { + for (const manager of managers) { + manager.startLocalPeerDiscovery() + } + return function destroy() { + return disconnectPeers(managers) + } + } else { + /** @type {import('../src/types.js').ReplicationStream[]} */ + const replicationStreams = [] + for (let i = 0; i < managers.length; i++) { + for (let j = i + 1; j < managers.length; j++) { + const r1 = managers[i][kManagerReplicate](true) + const r2 = managers[j][kManagerReplicate](false) + replicationStreams.push(r1, r2) + r1.pipe(r2).pipe(r1) + } + } + return function destroy() { + const promises = [] + for (const stream of replicationStreams) { + promises.push( + /** @type {Promise} */ + ( + new Promise((res) => { + stream.on('close', res) + stream.destroy() + }) + ) + ) + } + return Promise.all(promises) + } + } +} + +/** + * Invite mapeo clients to a project + * + * @param {{ + * invitor: MapeoManager, + * projectId: string, + * invitees: MapeoManager[], + * roleId?: import('../src/capabilities.js').RoleId, + * reject?: boolean + * }} opts + */ +export async function invite({ + invitor, + projectId, + invitees, + roleId = MEMBER_ROLE_ID, + reject = false, +}) { + const invitorProject = await invitor.getProject(projectId) + const promises = [] + + for (const invitee of invitees) { + promises.push( + invitorProject.$member.invite(invitee.deviceId, { + roleId, + }) + ) + promises.push( + once(invitee.invite, 'invite-received').then(([invite]) => { + return reject + ? invitee.invite.reject(invite.projectId) + : invitee.invite.accept(invite.projectId) + }) + ) + } + + await Promise.allSettled(promises) +} + +/** + * Waits for all manager instances to be connected to each other + * + * @param {readonly MapeoManager[]} managers + */ +export async function waitForPeers(managers) { + const peerCounts = managers.map((manager) => { + return manager[kRPC].peers.filter(({ status }) => status === 'connected') + .length + }) + const expectedCount = managers.length - 1 + return new Promise((res) => { + if (peerCounts.every((v) => v === expectedCount)) { + return res(null) + } + for (const [idx, manager] of managers.entries()) { + manager.on('local-peers', function onPeers(peers) { + const connectedPeerCount = peers.filter( + ({ status }) => status === 'connected' + ).length + peerCounts[idx] = connectedPeerCount + if (connectedPeerCount === expectedCount) { + manager.off('local-peers', onPeers) + } + if (peerCounts.every((v) => v === expectedCount)) { + res(null) + } + }) + } + }) +} + +/** + * Create `count` manager instances. Each instance has a deterministic identity + * keypair so device IDs should be consistent between tests. + * + * @template {number} T + * @param {T} count + * @returns {Promise>} + */ +export async function createManagers(count) { + // @ts-ignore + return Promise.all( + Array(count) + .fill(null) + .map(async (_, i) => { + const name = 'device' + i + const manager = createManager(name) + await manager.setDeviceInfo({ name }) + return manager + }) + ) +} + +/** @param {string} [seed] */ +export function createManager(seed) { + return new MapeoManager({ + rootKey: getRootKey(seed), + projectMigrationsFolder, + clientMigrationsFolder, + dbFolder: ':memory:', + coreStorage: () => new RAM(), + }) +} + +/** @param {string} [seed] */ +function getRootKey(seed) { + const key = Buffer.allocUnsafe(16) + if (!seed) { + sodium.randombytes_buf(key) + } else { + const seedBuf = Buffer.alloc(32) + sodium.crypto_generichash(seedBuf, Buffer.from(seed)) + sodium.randombytes_buf_deterministic(key, seedBuf) + } + return key +} +/** + * Remove undefined properties from an object, to allow deep comparison + * @param {object} obj + */ +export function stripUndef(obj) { + return JSON.parse(JSON.stringify(obj)) +} +/** + * + * @param {number} value + * @param {number} decimalPlaces + */ +export function round(value, decimalPlaces) { + return Math.round(value * 10 ** decimalPlaces) / 10 ** decimalPlaces +} + +/** + * Unlike `mapeo.project.$sync.waitForSync` this also waits for the specified + * number of peers to connect. + * + * @param {import('../src/mapeo-project.js').MapeoProject} project + * @param {string[]} peerIds + * @param {'initial' | 'full'} [type] + */ +async function waitForProjectSync(project, peerIds, type = 'initial') { + const state = await project.$sync.getState() + if (hasPeerIds(state.auth.remoteStates, peerIds)) { + return project.$sync.waitForSync(type) + } + return new Promise((res) => { + project.$sync.on('sync-state', function onState(state) { + if (!hasPeerIds(state.auth.remoteStates, peerIds)) return + project.$sync.off('sync-state', onState) + res(project.$sync.waitForSync(type)) + }) + }) +} + +/** + * @param {Record} remoteStates + * @param {string[]} peerIds + * @returns + */ +function hasPeerIds(remoteStates, peerIds) { + for (const peerId of peerIds) { + if (!(peerId in remoteStates)) return false + } + return true +} + +/** + * Wait for all projects to connect and sync + * + * @param {import('../src/mapeo-project.js').MapeoProject[]} projects + * @param {'initial' | 'full'} [type] + */ +export function waitForSync(projects, type = 'initial') { + return Promise.all( + projects.map((project) => { + const peerIds = projects + .filter((p) => p !== project) + .map((p) => p.deviceId) + return waitForProjectSync(project, peerIds, type) + }) + ) +} + +/** + * @param {import('../src/mapeo-project.js').MapeoProject[]} projects + */ +export function seedDatabases(projects) { + return Promise.all(projects.map((p) => seedProjectDatabase(p))) +} + +const SCHEMAS_TO_SEED = /** @type {const} */ ([ + 'observation', + 'preset', + 'field', +]) + +/** + * @param {import('../src/mapeo-project.js').MapeoProject} project + * @returns {Promise>} + */ +async function seedProjectDatabase(project) { + const promises = [] + for (const schemaName of SCHEMAS_TO_SEED) { + const count = + schemaName === 'observation' ? randomInt(20, 100) : randomInt(0, 10) + let i = 0 + while (i++ < count) { + const value = valueOf(generate(schemaName)[0]) + promises.push( + // @ts-ignore + project[schemaName].create(value) + ) + } + } + return Promise.all(promises) +} + +/** + * @template {object} T + * @param {T[]} arr + * @param {keyof T} key + */ +export function sortBy(arr, key) { + return arr.sort(function (a, b) { + if (a[key] < b[key]) return -1 + if (a[key] > b[key]) return 1 + return 0 + }) +} + +/** @param {import('@mapeo/schema').MapeoDoc[]} docs */ +export function sortById(docs) { + return sortBy(docs, 'docId') +} /** * Lazy way of removing fields with undefined values from an object * @param {unknown} object diff --git a/tests/blob-store/blob-store.js b/tests/blob-store/blob-store.js index 7dc5558cb..c15cd6fc8 100644 --- a/tests/blob-store/blob-store.js +++ b/tests/blob-store/blob-store.js @@ -5,10 +5,14 @@ import { pipelinePromise as pipeline } from 'streamx' import { randomBytes } from 'node:crypto' import fs from 'fs' import { readFile } from 'fs/promises' -import { createCoreManager, waitForCores } from '../helpers/core-manager.js' +import { + replicate, + createCoreManager, + waitForCores, +} from '../helpers/core-manager.js' import { BlobStore } from '../../src/blob-store/index.js' import { setTimeout } from 'node:timers/promises' -import { replicateBlobs, concat } from '../helpers/blob-store.js' +import { concat } from '../helpers/blob-store.js' import { discoveryKey } from 'hypercore-crypto' // Test with buffers that are 3 times the default blockSize for hyperblobs @@ -86,11 +90,13 @@ test('get(), initialized but unreplicated drive', async (t) => { }) const driveId = await bs1.put(blob1Id, blob1) - const { destroy } = replicateBlobs(cm1, cm2) + const { destroy } = replicate(cm1, cm2) await waitForCores(cm2, [cm1.getWriterCore('blobIndex').key]) /** @type {any} */ - const replicatedCore = cm2.getCoreByDiscoveryKey(Buffer.from(driveId, 'hex')) + const { core: replicatedCore } = cm2.getCoreByDiscoveryKey( + Buffer.from(driveId, 'hex') + ) await replicatedCore.update({ wait: true }) await destroy() t.is(replicatedCore.contiguousLength, 0, 'data is not downloaded') @@ -111,10 +117,12 @@ test('get(), replicated blobIndex, but blobs not replicated', async (t) => { }) const driveId = await bs1.put(blob1Id, blob1) - const { destroy } = replicateBlobs(cm1, cm2) + const { destroy } = replicate(cm1, cm2) await waitForCores(cm2, [cm1.getWriterCore('blobIndex').key]) /** @type {any} */ - const replicatedCore = cm2.getCoreByDiscoveryKey(Buffer.from(driveId, 'hex')) + const { core: replicatedCore } = cm2.getCoreByDiscoveryKey( + Buffer.from(driveId, 'hex') + ) await replicatedCore.update({ wait: true }) await replicatedCore.download({ end: replicatedCore.length }).done() await destroy() @@ -273,13 +281,13 @@ test('live download', async function (t) { // STEP 1: Write a blob to CM1 const driveId1 = await bs1.put(blob1Id, blob1) // STEP 2: Replicate CM1 with CM3 - const { destroy: destroy1 } = replicateBlobs(cm1, cm3) + const { destroy: destroy1 } = replicate(cm1, cm3) // STEP 3: Start live download to CM3 const liveDownload = bs3.download() // STEP 4: Wait for blobs to be downloaded await downloaded(liveDownload) // STEP 5: Replicate CM2 with CM3 - const { destroy: destroy2 } = replicateBlobs(cm2, cm3) + const { destroy: destroy2 } = replicate(cm2, cm3) // STEP 6: Write a blob to CM2 const driveId2 = await bs2.put(blob2Id, blob2) // STEP 7: Wait for blobs to be downloaded @@ -328,7 +336,7 @@ test('sparse live download', async function (t) { await bs1.put(blob2Id, blob2) await bs1.put(blob3Id, blob3) - const { destroy } = replicateBlobs(cm1, cm2) + const { destroy } = replicate(cm1, cm2) const liveDownload = bs2.download({ photo: ['original', 'preview'] }) await downloaded(liveDownload) @@ -365,7 +373,7 @@ test('cancelled live download', async function (t) { // STEP 1: Write a blob to CM1 const driveId1 = await bs1.put(blob1Id, blob1) // STEP 2: Replicate CM1 with CM3 - const { destroy: destroy1 } = replicateBlobs(cm1, cm3) + const { destroy: destroy1 } = replicate(cm1, cm3) // STEP 3: Start live download to CM3 const ac = new AbortController() const liveDownload = bs3.download(undefined, { signal: ac.signal }) @@ -374,7 +382,7 @@ test('cancelled live download', async function (t) { // STEP 5: Cancel download ac.abort() // STEP 6: Replicate CM2 with CM3 - const { destroy: destroy2 } = replicateBlobs(cm2, cm3) + const { destroy: destroy2 } = replicate(cm2, cm3) // STEP 7: Write a blob to CM2 const driveId2 = await bs2.put(blob2Id, blob2) // STEP 8: Wait for blobs to (not) download diff --git a/tests/core-manager.js b/tests/core-manager.js index cb7e2b2cf..5be685fcd 100644 --- a/tests/core-manager.js +++ b/tests/core-manager.js @@ -6,7 +6,10 @@ import { createCoreManager, replicate } from './helpers/core-manager.js' import { randomBytes } from 'crypto' import Sqlite from 'better-sqlite3' import { KeyManager } from '@mapeo/crypto' -import { CoreManager, unreplicate } from '../src/core-manager/index.js' +import { + CoreManager, + kCoreManagerReplicate, +} from '../src/core-manager/index.js' import RemoteBitfield from '../src/core-manager/remote-bitfield.js' import assert from 'assert' import { once } from 'node:events' @@ -15,8 +18,8 @@ import { exec } from 'child_process' import { RandomAccessFilePool } from '../src/core-manager/random-access-file-pool.js' import RandomAccessFile from 'random-access-file' import path from 'path' -import { setTimeout as delay } from 'node:timers/promises' import { Transform } from 'streamx' +import { waitForCores } from './helpers/core-manager.js' async function createCore(key) { const core = new Hypercore(RAM, key) @@ -24,76 +27,6 @@ async function createCore(key) { return core } -test('shares auth cores', async function (t) { - const projectKey = randomBytes(32) - const cm1 = createCoreManager({ projectKey }) - const cm2 = createCoreManager({ projectKey }) - - replicate(cm1, cm2) - - await Promise.all([ - waitForCores(cm1, getKeys(cm2, 'auth')), - waitForCores(cm2, getKeys(cm1, 'auth')), - ]) - - const cm1Keys = getKeys(cm1, 'auth').sort(Buffer.compare) - const cm2Keys = getKeys(cm2, 'auth').sort(Buffer.compare) - - t.alike(cm1Keys, cm2Keys, 'Share same auth cores') -}) - -test('shares other cores', async function (t) { - const projectKey = randomBytes(32) - const cm1 = createCoreManager({ projectKey }) - const cm2 = createCoreManager({ projectKey }) - - const { - rsm: [rsm1, rsm2], - } = replicate(cm1, cm2) - - for (const namespace of ['config', 'data', 'blob', 'blobIndex']) { - rsm1.enableNamespace(namespace) - rsm2.enableNamespace(namespace) - await Promise.all([ - waitForCores(cm1, getKeys(cm2, namespace)), - waitForCores(cm2, getKeys(cm1, namespace)), - ]) - const cm1Keys = getKeys(cm1, namespace).sort(Buffer.compare) - const cm2Keys = getKeys(cm2, namespace).sort(Buffer.compare) - - t.alike(cm1Keys, cm2Keys, `Share same ${namespace} cores`) - } -}) - -// Testing this case because in real-use namespaces are not enabled at the same time -test('shares cores if namespaces enabled at different times', async function (t) { - const projectKey = randomBytes(32) - const cm1 = createCoreManager({ projectKey }) - const cm2 = createCoreManager({ projectKey }) - - const { - rsm: [rsm1, rsm2], - } = replicate(cm1, cm2) - - for (const namespace of ['config', 'data', 'blob', 'blobIndex']) { - rsm1.enableNamespace(namespace) - } - - await delay(1000) - - for (const namespace of ['config', 'data', 'blob', 'blobIndex']) { - rsm2.enableNamespace(namespace) - await Promise.all([ - waitForCores(cm1, getKeys(cm2, namespace)), - waitForCores(cm2, getKeys(cm1, namespace)), - ]) - const cm1Keys = getKeys(cm1, namespace).sort(Buffer.compare) - const cm2Keys = getKeys(cm2, namespace).sort(Buffer.compare) - - t.alike(cm1Keys, cm2Keys, `Share same ${namespace} cores`) - } -}) - test('project creator auth core has project key', async function (t) { const sqlite = new Sqlite(':memory:') const keyManager = new KeyManager(randomBytes(16)) @@ -167,7 +100,6 @@ test('eagerly updates remote bitfields', async function (t) { .done() await destroyReplication() await cm1Core.clear(0, 2) - { // This is ensuring that bitfields also get propogated in the other // direction, e.g. from the non-writer to the writer @@ -224,62 +156,6 @@ test('eagerly updates remote bitfields', async function (t) { } }) -test('works with an existing protocol stream for replications', async function (t) { - const projectKey = randomBytes(32) - const cm1 = createCoreManager({ projectKey }) - const cm2 = createCoreManager({ projectKey }) - - const n1 = new NoiseSecretStream(true) - const n2 = new NoiseSecretStream(false) - n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) - - const s1 = Hypercore.createProtocolStream(n1) - const s2 = Hypercore.createProtocolStream(n2) - - cm1.replicate(s1) - cm2.replicate(s2) - - await Promise.all([ - waitForCores(cm1, getKeys(cm2, 'auth')), - waitForCores(cm2, getKeys(cm1, 'auth')), - ]) - - const cm1Keys = getKeys(cm1, 'auth').sort(Buffer.compare) - const cm2Keys = getKeys(cm2, 'auth').sort(Buffer.compare) - - t.alike(cm1Keys, cm2Keys, 'Share same auth cores') -}) - -test.skip('can mux other project replications over same stream', async function (t) { - // This test fails because https://github.com/holepunchto/corestore/issues/45 - // The `ondiscoverykey` hook for `Hypercore.createProtocolStream()` that we - // use to know when other cores are muxed in the stream is only called the - // first time the protocol stream is created. When a second core replicates - // to the same stream, it sees it is already a protomux stream, and it does - // not add the notify hook for `ondiscoverykey`. - // We might be able to work around this if we want to enable multi-project - // muxing before the issue is resolved by creating the protomux stream outside - // the core manager, and then somehow hooking into the relevant corestore. - t.plan(2) - const projectKey = randomBytes(32) - const cm1 = createCoreManager({ projectKey }) - const cm2 = createCoreManager({ projectKey }) - const otherProject = createCoreManager() - - const n1 = new NoiseSecretStream(true) - const n2 = new NoiseSecretStream(false) - n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) - - await Promise.all([ - waitForCores(cm1, getKeys(cm2, 'auth')), - waitForCores(cm2, getKeys(cm1, 'auth')), - ]) - - cm1.replicate(n1) - otherProject.replicate(n2) - cm2.replicate(n2) -}) - test('multiplexing waits for cores to be added', async function (t) { // Mapeo code expects replication to work when cores are not added to the // replication stream at the same time. This is not explicitly tested in @@ -326,8 +202,6 @@ test('close()', async (t) => { t.is(core.sessions.length, 0, 'no open sessions') } } - const ns = new NoiseSecretStream(true) - t.exception(() => cm.replicate(ns), /closed/) }) test('Added cores are persisted', async (t) => { @@ -365,14 +239,8 @@ test('encryption', async function (t) { const cm2 = createCoreManager({ projectKey }) const cm3 = createCoreManager({ projectKey, encryptionKeys }) - const { rsm: rsm1 } = replicate(cm1, cm2) - const { rsm: rsm2 } = replicate(cm1, cm3) - - for (const rsm of [...rsm1, ...rsm2]) { - for (const ns of CoreManager.namespaces) { - rsm.enableNamespace(ns) - } - } + replicate(cm1, cm2) + replicate(cm1, cm3) for (const ns of CoreManager.namespaces) { const { core, key } = cm1.getWriterCore(ns) @@ -442,39 +310,6 @@ test('poolSize limits number of open file descriptors', async function (t) { }) }) -async function waitForCores(coreManager, keys) { - const allKeys = getAllKeys(coreManager) - if (hasKeys(keys, allKeys)) return - return new Promise((res) => { - coreManager.on('add-core', function onAddCore({ key }) { - allKeys.push(key) - if (hasKeys(keys, allKeys)) { - coreManager.off('add-core', onAddCore) - res() - } - }) - }) -} - -function getAllKeys(coreManager) { - const keys = [] - for (const namespace of CoreManager.namespaces) { - keys.push.apply(keys, getKeys(coreManager, namespace)) - } - return keys -} - -function getKeys(coreManager, namespace) { - return coreManager.getCores(namespace).map(({ key }) => key) -} - -function hasKeys(someKeys, allKeys) { - for (const key of someKeys) { - if (!allKeys.find((k) => k.equals(key))) return false - } - return true -} - test('sends "haves" bitfields over project creator core replication stream', async function (t) { const projectKey = randomBytes(32) const cm1 = createCoreManager({ projectKey }) @@ -510,8 +345,8 @@ test('sends "haves" bitfields over project creator core replication stream', asy const cm1Core = cm1.getWriterCore('data').core await cm1Core.ready() const batchSize = 4096 - // Create 1 million entries in hypercore - for (let i = 0; i < 2 ** 20; i += batchSize) { + // Create 4 million entries in hypercore - will be at least two have bitfields + for (let i = 0; i < 2 ** 22; i += batchSize) { const data = Array(batchSize) .fill(null) .map(() => 'block') @@ -525,8 +360,8 @@ test('sends "haves" bitfields over project creator core replication stream', asy const n2 = new NoiseSecretStream(false) n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) - cm1.replicate(n1) - cm2.replicate(n2) + cm1[kCoreManagerReplicate](n1) + cm2[kCoreManagerReplicate](n2) // Need to wait for now, since no event for when a remote bitfield is updated await new Promise((res) => setTimeout(res, 200)) @@ -622,40 +457,6 @@ test('unreplicate', async (t) => { }) }) -test('disableNamespace and re-enable', async (t) => { - const projectKey = randomBytes(32) - const cm1 = createCoreManager({ projectKey }) - const cm2 = createCoreManager({ projectKey }) - - const { - rsm: [rsm1, rsm2], - } = replicate(cm1, cm2) - - rsm1.enableNamespace('data') - rsm2.enableNamespace('data') - - await Promise.all([ - waitForCores(cm1, getKeys(cm2, 'data')), - waitForCores(cm2, getKeys(cm1, 'data')), - ]) - - const data1CR = cm1.getWriterCore('data') - await data1CR.core.append(['a', 'b', 'c']) - - const data1ReplicaCore = cm2.getCoreByKey(data1CR.key) - t.is((await data1ReplicaCore.get(2, { timeout: 200 })).toString(), 'c') - - rsm1.disableNamespace('data') - - await data1CR.core.append(['d', 'e', 'f']) - - await t.exception(() => data1ReplicaCore.get(5, { timeout: 200 })) - - rsm1.enableNamespace('data') - - t.is((await data1ReplicaCore.get(5, { timeout: 200 })).toString(), 'f') -}) - const DEBUG = process.env.DEBUG // Compare two bitfields (instance of core.core.bitfield or peer.remoteBitfield) @@ -728,3 +529,17 @@ function latencyStream(delay = 0) { }, }) } + +/** + * + * @param {Hypercore<'binary', any>} core + * @param {import('protomux')} protomux + */ +export function unreplicate(core, protomux) { + const peerToUnreplicate = core.peers.find( + (peer) => peer.protomux === protomux + ) + if (!peerToUnreplicate) return + peerToUnreplicate.channel.close() + return +} diff --git a/tests/data-type.js b/tests/data-type.js index 14940bb28..2194f9cd9 100644 --- a/tests/data-type.js +++ b/tests/data-type.js @@ -84,7 +84,7 @@ test('test validity of `createdBy` field from another peer', async (t) => { const obs = await dt1.create(obsFixture) const driveId = ds1.writerCore.key - const { destroy } = replicateDataStore(cm1, cm2) + const { destroy } = replicate(cm1, cm2) await waitForCores(cm2, [driveId]) const replicatedCore = cm2.getCoreByKey(driveId) await replicatedCore.update({ wait: true }) @@ -135,17 +135,3 @@ async function testenv(opts) { return { coreManager, dataType, dataStore } } - -function replicateDataStore(cm1, cm2) { - const { - rsm: [rsm1, rsm2], - destroy, - } = replicate(cm1, cm2) - - rsm1.enableNamespace('data') - rsm2.enableNamespace('data') - return { - rsm: /** @type {const} */ ([rsm1, rsm2]), - destroy, - } -} diff --git a/tests/fastify-plugins/blobs.js b/tests/fastify-plugins/blobs.js index 752818aef..84b257ddc 100644 --- a/tests/fastify-plugins/blobs.js +++ b/tests/fastify-plugins/blobs.js @@ -9,8 +9,11 @@ import fastify from 'fastify' import { BlobStore } from '../../src/blob-store/index.js' import BlobServerPlugin from '../../src/fastify-plugins/blobs.js' import { projectKeyToPublicId } from '../../src/utils.js' -import { replicateBlobs } from '../helpers/blob-store.js' -import { createCoreManager, waitForCores } from '../helpers/core-manager.js' +import { + createCoreManager, + waitForCores, + replicate, +} from '../helpers/core-manager.js' test('Plugin throws error if missing getBlobStore option', async (t) => { const server = fastify() @@ -224,12 +227,12 @@ test('GET photo returns 404 when trying to get non-replicated blob', async (t) = const [{ blobId }] = data - const { destroy } = replicateBlobs(cm1, cm2) + const { destroy } = replicate(cm1, cm2) await waitForCores(cm2, [cm1.getWriterCore('blobIndex').key]) /** @type {any}*/ - const replicatedCore = cm2.getCoreByDiscoveryKey( + const { core: replicatedCore } = cm2.getCoreByDiscoveryKey( Buffer.from(blobId.driveId, 'hex') ) await replicatedCore.update({ wait: true }) diff --git a/tests/helpers/blob-store.js b/tests/helpers/blob-store.js index 2f49dd305..a56b248c4 100644 --- a/tests/helpers/blob-store.js +++ b/tests/helpers/blob-store.js @@ -1,5 +1,4 @@ // @ts-nocheck -import { replicate } from './core-manager.js' import { pipelinePromise as pipeline, Writable } from 'streamx' import { BlobStore } from '../../src/blob-store/index.js' @@ -16,26 +15,6 @@ export function createBlobStore(options = {}) { return { blobStore, coreManager } } -/** - * - * @param {import('../../src/core-manager/index.js').CoreManager} cm1 - * @param {import('../../src/core-manager/index.js').CoreManager} cm2 - */ -export function replicateBlobs(cm1, cm2) { - const { - rsm: [rsm1, rsm2], - destroy, - } = replicate(cm1, cm2) - rsm1.enableNamespace('blobIndex') - rsm1.enableNamespace('blob') - rsm2.enableNamespace('blobIndex') - rsm2.enableNamespace('blob') - return { - rsm: /** @type {const} */ ([rsm1, rsm2]), - destroy, - } -} - /** * @param {*} rs * @returns {Promise} diff --git a/tests/helpers/core-manager.js b/tests/helpers/core-manager.js index d7b2a5261..a4663fab4 100644 --- a/tests/helpers/core-manager.js +++ b/tests/helpers/core-manager.js @@ -1,11 +1,13 @@ // @ts-nocheck -import { CoreManager } from '../../src/core-manager/index.js' +import { + CoreManager, + kCoreManagerReplicate, +} from '../../src/core-manager/index.js' import Sqlite from 'better-sqlite3' import { randomBytes } from 'crypto' import { KeyManager } from '@mapeo/crypto' import RAM from 'random-access-memory' import NoiseSecretStream from '@hyperswarm/secret-stream' - /** * * @param {Partial[0]> & { rootKey?: Buffer }} param0 @@ -23,6 +25,7 @@ export function createCoreManager({ keyManager, storage: RAM, projectKey, + autoDownload: false, ...opts, }) } @@ -43,8 +46,8 @@ export function replicate( const n2 = new NoiseSecretStream(false, undefined, { keyPair: kp2 }) n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) - const rsm1 = cm1.replicate(n1) - const rsm2 = cm2.replicate(n2) + cm1[kCoreManagerReplicate](n1) + cm2[kCoreManagerReplicate](n2) async function destroy() { await Promise.all([ @@ -60,7 +63,6 @@ export function replicate( } return { - rsm: [rsm1, rsm2], destroy, } } diff --git a/tests/local-peers.js b/tests/local-peers.js index 084b7e8ba..9b151c7e7 100644 --- a/tests/local-peers.js +++ b/tests/local-peers.js @@ -109,7 +109,7 @@ test('Send invite, duplicate connections', async (t) => { t.is(peers3.length, 1) t.ok( peers3[0].connectedAt > peers1[0].connectedAt, - 'later connected peer is not used' + `later connected peer is not used: ${peers3[0].connectedAt} ${peers1[0].connectedAt}` ) { diff --git a/tests/sync/peer-sync-controller.js b/tests/sync/peer-sync-controller.js deleted file mode 100644 index 5d9e2c220..000000000 --- a/tests/sync/peer-sync-controller.js +++ /dev/null @@ -1,188 +0,0 @@ -// @ts-check - -import test from 'brittle' -import Hypercore from 'hypercore' -import { PeerSyncController } from '../../src/sync/peer-sync-controller.js' -import { createCoreManager } from '../helpers/core-manager.js' -import { KeyManager } from '@mapeo/crypto' -import { once } from 'node:events' -import { setTimeout } from 'node:timers/promises' -import { NAMESPACES } from '../../src/core-manager/index.js' -import { SyncState } from '../../src/sync/sync-state.js' -import { - BLOCKED_ROLE_ID, - CREATOR_CAPABILITIES, - DEFAULT_CAPABILITIES, -} from '../../src/capabilities.js' - -test('auth, config and blobIndex enabled by default', async (t) => { - const { - coreManagers: [cm1, cm2], - } = await testenv(CREATOR_CAPABILITIES) - - const preSyncNamespaces = /** @type {const} */ ([ - 'auth', - 'config', - 'blobIndex', - ]) - - const peerAddPromises = [] - for (const ns of preSyncNamespaces) { - peerAddPromises.push( - once(cm1.getWriterCore(ns).core, 'peer-add'), - once(cm1.getWriterCore(ns).core, 'peer-add') - ) - } - await Promise.all(peerAddPromises) - t.pass('pre-sync cores connected') - - // Wait to give other namespaces a chance to connect (they shouldn't) - await setTimeout(500) - - for (const ns of NAMESPACES) { - for (const cm of [cm1, cm2]) { - const nsCores = cm.getCores(ns) - t.is( - nsCores.length, - includes(preSyncNamespaces, ns) ? 2 : 1, - 'preSync namespaces have 2 cores, others have 1' - ) - for (const { core } of nsCores) { - if (includes(preSyncNamespaces, ns)) { - t.is(core.peers.length, 1, 'pre-sync namespace cores have one peer') - } else { - t.is(core.peers.length, 0, 'non-pre-sync cores have no peers') - } - } - } - } -}) - -test('enabling data sync replicates all cores', async (t) => { - const { - coreManagers: [cm1, cm2], - peerSyncControllers: [psc1, psc2], - } = await testenv(CREATOR_CAPABILITIES) - - psc1.enableDataSync() - psc2.enableDataSync() - - const peerAddPromises = [] - for (const ns of NAMESPACES) { - peerAddPromises.push( - once(cm1.getWriterCore(ns).core, 'peer-add'), - once(cm1.getWriterCore(ns).core, 'peer-add') - ) - } - await Promise.all(peerAddPromises) - - for (const ns of NAMESPACES) { - for (const [i, cm] of [cm1, cm2].entries()) { - const nsCores = cm.getCores(ns) - t.is(nsCores.length, 2, `cm${i + 1}: namespace ${ns} has 2 cores now`) - for (const { core } of nsCores) { - t.is( - core.peers.length, - 1, - `cm${i + 1}: ${ns} ${ - core === cm.getWriterCore(ns).core ? 'own' : 'synced' - } core is connected` - ) - } - } - } -}) - -test('no sync capabilities === no namespaces sync apart from auth', async (t) => { - const { - coreManagers: [cm1, cm2], - peerSyncControllers: [psc1, psc2], - } = await testenv(DEFAULT_CAPABILITIES[BLOCKED_ROLE_ID]) - - psc1.enableDataSync() - psc2.enableDataSync() - - // Wait to give cores a chance to connect - await setTimeout(500) - - for (const ns of NAMESPACES) { - for (const cm of [cm1, cm2]) { - const nsCores = cm.getCores(ns) - if (ns === 'auth') { - t.is(nsCores.length, 2, `all auth cores have been shared`) - // no guarantees about sharing of other cores yet - } - for (const { core } of nsCores) { - const isCreatorCore = core === cm.creatorCore - if (isCreatorCore) { - t.is(core.peers.length, 1, 'creator core remains connected') - } else { - t.is(core.peers.length, 0, 'core is disconnected') - } - } - } - } -}) - -/** - * - * @param {import('../../src/capabilities.js').Capability} cap - * @returns - */ -async function testenv(cap) { - const { publicKey: projectKey, secretKey: projectSecretKey } = - KeyManager.generateProjectKeypair() - const cm1 = await createCoreManager({ projectKey, projectSecretKey }) - const cm2 = await createCoreManager({ projectKey }) - - const stream1 = Hypercore.createProtocolStream(true, { - ondiscoverykey: (discoveryKey) => - cm1.handleDiscoveryKey(discoveryKey, stream1), - }) - const stream2 = Hypercore.createProtocolStream(false, { - ondiscoverykey: (discoveryKey) => - cm2.handleDiscoveryKey(discoveryKey, stream2), - }) - stream1.pipe(stream2).pipe(stream1) - - const psc1 = new PeerSyncController({ - protomux: stream1.noiseStream.userData, - coreManager: cm1, - syncState: new SyncState({ coreManager: cm1 }), - // @ts-expect-error - capabilities: { - async getCapabilities() { - return cap - }, - }, - }) - const psc2 = new PeerSyncController({ - protomux: stream2.noiseStream.userData, - coreManager: cm2, - syncState: new SyncState({ coreManager: cm2 }), - // @ts-expect-error - capabilities: { - async getCapabilities() { - return cap - }, - }, - }) - - return { - peerSyncControllers: [psc1, psc2], - coreManagers: [cm1, cm2], - } -} - -/** - * Helper for Typescript array.prototype.includes - * - * @template {U} T - * @template U - * @param {ReadonlyArray} coll - * @param {U} el - * @returns {el is T} - */ -function includes(coll, el) { - return coll.includes(/** @type {T} */ (el)) -} diff --git a/types/corestore.d.ts b/types/corestore.d.ts index a4bd6bfc2..41545c146 100644 --- a/types/corestore.d.ts +++ b/types/corestore.d.ts @@ -27,6 +27,7 @@ declare module 'corestore' { options: Omit & { key?: Buffer | string | undefined keyPair: { publicKey: Buffer; secretKey?: Buffer | undefined | null } + sparse?: boolean } ): Hypercore replicate: typeof Hypercore.prototype.replicate