diff --git a/proto/extensions.proto b/proto/extensions.proto index 344b334b4..7bab5f61e 100644 --- a/proto/extensions.proto +++ b/proto/extensions.proto @@ -1,13 +1,7 @@ syntax = "proto3"; message ProjectExtension { - repeated bytes wantCoreKeys = 1; - - repeated bytes authCoreKeys = 2; - repeated bytes configCoreKeys = 3; - repeated bytes dataCoreKeys = 4; - repeated bytes blobIndexCoreKeys = 5; - repeated bytes blobCoreKeys = 6; + repeated bytes authCoreKeys = 1; } message HaveExtension { diff --git a/src/core-manager/index.js b/src/core-manager/index.js index 9168449a1..431ec882c 100644 --- a/src/core-manager/index.js +++ b/src/core-manager/index.js @@ -6,9 +6,6 @@ import { HaveExtension, ProjectExtension } from '../generated/extensions.js' import { CoreIndex } from './core-index.js' import * as rle from './bitfield-rle.js' import { Logger } from '../logger.js' -import { keyToId } from '../utils.js' -import { discoveryKey } from 'hypercore-crypto' -import Hypercore from 'hypercore' export const kCoreManagerReplicate = Symbol('replicate core manager') // WARNING: Changing these will break things for existing apps, since namespaces @@ -55,12 +52,6 @@ export class CoreManager extends TypedEmitter { #haveExtension #deviceId #l - /** - * We use this to reduce network traffic caused by requesting the same key - * from multiple clients. - * TODO: Remove items from this set after a max age - */ - #keyRequests = new TrackedKeyRequests() #autoDownload static get namespaces() { @@ -155,8 +146,8 @@ export class CoreManager extends TypedEmitter { 'mapeo/project', { encoding: ProjectExtensionCodec, - onmessage: (msg, peer) => { - this.#handleProjectMessage(msg, peer) + onmessage: (msg) => { + this.#handleProjectMessage(msg) }, } ) @@ -168,16 +159,6 @@ export class CoreManager extends TypedEmitter { }, }) - this.#creatorCore.on('peer-add', (peer) => { - this.#sendHaves(peer) - }) - this.#creatorCore.on('peer-remove', (peer) => { - // When a peer is removed we clean up any unanswered key requests, so that - // we will request from a different peer, and to avoid the tracking of key - // requests growing without bounds. - this.#keyRequests.deleteByPeerKey(peer.remotePublicKey) - }) - this.#ready = Promise.all( [...this.#coreIndex].map(({ core }) => core.ready()) ) @@ -253,7 +234,6 @@ export class CoreManager extends TypedEmitter { */ async close() { this.#state = 'closing' - this.#keyRequests.clear() const promises = [] for (const { core } of this.#coreIndex) { promises.push(core.close()) @@ -342,69 +322,51 @@ export class CoreManager extends TypedEmitter { } /** - * Send an extension message over the project creator core replication stream - * requesting a core key for the given discovery key. - * - * @param {Buffer} peerKey - * @param {Buffer} discoveryKey + * @param {ProjectExtension} msg */ - requestCoreKey(peerKey, discoveryKey) { - // No-op if we already have this core - if (this.getCoreByDiscoveryKey(discoveryKey)) return - const peer = this.#creatorCore.peers.find((peer) => { - return peer.remotePublicKey.equals(peerKey) - }) - if (!peer) { - // This should not happen because this is only called from SyncApi, which - // checks the peer exists before calling this method. - this.#l.log( - 'Attempted to request core key for %h, but no connected peer %h', - discoveryKey, - peerKey - ) - return + #handleProjectMessage({ authCoreKeys }) { + for (const coreKey of authCoreKeys) { + // Use public method - these must be persisted (private method defaults to persisted=false) + this.addCore(coreKey, 'auth') } - // Only request a key once, e.g. from the peer we first receive it from (we - // can assume that a peer must have the key if we see the discovery key in - // the protomux). This is necessary to reduce network traffic for many newly - // connected peers - otherwise duplicate requests will be sent to every peer - if (this.#keyRequests.has(discoveryKey)) return - this.#keyRequests.set(discoveryKey, peerKey) + } - this.#l.log( - 'Requesting core key for discovery key %h from peer %h', - discoveryKey, - peerKey - ) - const message = ProjectExtension.fromPartial({ - wantCoreKeys: [discoveryKey], - }) - this.#projectExtension.send(message, peer) + /** + * Sends auth core keys to the given peer, skipping any keys that we know the + * peer has already (depends on the peer having already replicated the auth + * cores it has) + * + * @param {any} peer + */ + sendAuthCoreKeys(peer) { + this.#sendCoreKeys(peer, ['auth']) } /** - * @param {ProjectExtension} msg + * We only send non-auth core keys to a peer for unit tests * @param {any} peer + * @param {Readonly} namespaces */ - #handleProjectMessage({ wantCoreKeys, ...coreKeys }, peer) { + #sendCoreKeys(peer, namespaces) { const message = ProjectExtension.create() let hasKeys = false - for (const discoveryKey of wantCoreKeys) { - const coreRecord = this.getCoreByDiscoveryKey(discoveryKey) - if (!coreRecord) continue - message[`${coreRecord.namespace}CoreKeys`].push(coreRecord.key) - hasKeys = true + for (const ns of namespaces) { + for (const { core, key } of this.getCores(ns)) { + let peerHasKeyAlready = false + for (const peer of core.peers) { + if (peer.remotePublicKey.equals(peer.remotePublicKey)) { + peerHasKeyAlready = true + break + } + } + if (peerHasKeyAlready) continue + message.authCoreKeys.push(key) + hasKeys = true + } } if (hasKeys) { this.#projectExtension.send(message, peer) } - for (const namespace of NAMESPACES) { - for (const coreKey of coreKeys[`${namespace}CoreKeys`]) { - // Use public method - these must be persisted (private method defaults to persisted=false) - this.addCore(coreKey, namespace) - this.#keyRequests.deleteByDiscoveryKey(discoveryKey(coreKey)) - } - } } /** @@ -426,21 +388,17 @@ export class CoreManager extends TypedEmitter { } /** - * * @param {any} peer + * @param {Exclude} namespace */ - async #sendHaves(peer) { - if (!peer) { - console.warn('sendHaves no peer', peer.remotePublicKey) - // TODO: How to handle this and when does it happen? - return - } + async sendHaves(peer, namespace) { + // We want ready() rather than update() because we are only interested in + // local data. This waits for all cores to be ready. + await this.ready() peer.protomux.cork() - for (const { core, namespace } of this.#coreIndex) { - // We want ready() rather than update() because we are only interested in local data - await core.ready() + for (const { core } of this.getCores(namespace)) { if (core.length === 0) continue const { discoveryKey } = core // This will always be defined after ready(), but need to let TS know @@ -465,18 +423,14 @@ export class CoreManager extends TypedEmitter { * @returns */ [kCoreManagerReplicate](stream) { - const protocolStream = Hypercore.createProtocolStream(stream, { - ondiscoverykey: async (discoveryKey) => { - const peer = await findPeer( - this.creatorCore, - // @ts-ignore - protocolStream.noiseStream.remotePublicKey - ) - if (!peer) return - this.requestCoreKey(peer.remotePublicKey, discoveryKey) - }, + const protocolStream = this.#corestore.replicate(stream) + this.#creatorCore.on('peer-add', (peer) => { + // Normally only auth core keys are sent, but for unit tests we need to + // send all of them, because unit tests don't include the Sync API which + // adds cores from core ownership records. + this.#sendCoreKeys(peer, NAMESPACES) }) - return this.#corestore.replicate(stream) + return protocolStream } } @@ -523,93 +477,3 @@ const HaveExtensionCodec = { } }, } - -class TrackedKeyRequests { - /** @type {Map} */ - #byDiscoveryId = new Map() - /** @type {Map>} */ - #byPeerId = new Map() - - /** - * @param {Buffer} discoveryKey - * @param {Buffer} peerKey - */ - set(discoveryKey, peerKey) { - const discoveryId = keyToId(discoveryKey) - const peerId = keyToId(peerKey) - const existingForPeer = this.#byPeerId.get(peerId) || new Set() - this.#byDiscoveryId.set(discoveryId, peerId) - existingForPeer.add(discoveryId) - this.#byPeerId.set(peerId, existingForPeer) - return this - } - /** - * @param {Buffer} discoveryKey - */ - has(discoveryKey) { - const discoveryId = keyToId(discoveryKey) - return this.#byDiscoveryId.has(discoveryId) - } - /** - * @param {Buffer} discoveryKey - */ - deleteByDiscoveryKey(discoveryKey) { - const discoveryId = keyToId(discoveryKey) - const peerId = this.#byDiscoveryId.get(discoveryId) - if (!peerId) return false - this.#byDiscoveryId.delete(discoveryId) - const existingForPeer = this.#byPeerId.get(peerId) - if (existingForPeer) { - existingForPeer.delete(discoveryId) - } - return true - } - /** - * @param {Buffer} peerKey - */ - deleteByPeerKey(peerKey) { - const peerId = keyToId(peerKey) - const existingForPeer = this.#byPeerId.get(peerId) - if (!existingForPeer) return - for (const discoveryId of existingForPeer) { - this.#byDiscoveryId.delete(discoveryId) - } - this.#byPeerId.delete(peerId) - } - clear() { - this.#byDiscoveryId.clear() - this.#byPeerId.clear() - } -} - -/** - * @param {Hypercore<"binary", Buffer>} core - * @param {Buffer} publicKey - * @param {{ timeout?: number }} [opts] - */ -function findPeer(core, publicKey, { timeout = 200 } = {}) { - const peer = core.peers.find((peer) => { - return peer.remotePublicKey.equals(publicKey) - }) - if (peer) return peer - // This is called from the from the handleDiscoveryId event, which can - // happen before the peer connection is fully established, so we wait for - // the `peer-add` event, with a timeout in case the peer never gets added - return new Promise(function (res) { - const timeoutId = setTimeout(function () { - core.off('peer-add', onPeer) - res(null) - }, timeout) - - core.on('peer-add', onPeer) - - /** @param {any} peer */ - function onPeer(peer) { - if (peer.remotePublicKey.equals(publicKey)) { - clearTimeout(timeoutId) - core.off('peer-add', onPeer) - res(peer) - } - } - }) -} diff --git a/src/core-ownership.js b/src/core-ownership.js index 3a4608982..49f708820 100644 --- a/src/core-ownership.js +++ b/src/core-ownership.js @@ -75,17 +75,23 @@ export class CoreOwnership { } /** - * * @param {string} deviceId * @param {typeof NAMESPACES[number]} namespace * @returns {Promise} coreId of core belonging to `deviceId` for `namespace` */ async getCoreId(deviceId, namespace) { - await this.#ownershipWriteDone - const result = await this.#dataType.getByDocId(deviceId) + const result = await this.get(deviceId) return result[`${namespace}CoreId`] } + /** + * @param {string} deviceId + */ + async get(deviceId) { + await this.#ownershipWriteDone + return this.#dataType.getByDocId(deviceId) + } + /** * * @param {import('./types.js').KeyPair} identityKeypair diff --git a/src/datastore/index.js b/src/datastore/index.js index a2e0ba01e..68e96dc36 100644 --- a/src/datastore/index.js +++ b/src/datastore/index.js @@ -1,4 +1,3 @@ -import { TypedEmitter } from 'tiny-typed-emitter' import { encode, decode, getVersionId, parseVersionId } from '@mapeo/schema' import MultiCoreIndexer from 'multi-core-indexer' import pDefer from 'p-defer' @@ -30,9 +29,8 @@ const NAMESPACE_SCHEMAS = /** @type {const} */ ({ /** * @template {keyof NamespaceSchemas} [TNamespace=keyof NamespaceSchemas] * @template {NamespaceSchemas[TNamespace][number]} [TSchemaName=NamespaceSchemas[TNamespace][number]] - * @extends {TypedEmitter} */ -export class DataStore extends TypedEmitter { +export class DataStore { #coreManager #namespace #batch @@ -51,7 +49,6 @@ export class DataStore extends TypedEmitter { * @param {MultiCoreIndexer.StorageParam} opts.storage */ constructor({ coreManager, namespace, batch, storage }) { - super() this.#coreManager = coreManager this.#namespace = namespace this.#batch = batch diff --git a/src/generated/extensions.d.ts b/src/generated/extensions.d.ts index 43d4b4c46..da0ed8aec 100644 --- a/src/generated/extensions.d.ts +++ b/src/generated/extensions.d.ts @@ -1,12 +1,7 @@ /// import _m0 from "protobufjs/minimal.js"; export interface ProjectExtension { - wantCoreKeys: Buffer[]; authCoreKeys: Buffer[]; - configCoreKeys: Buffer[]; - dataCoreKeys: Buffer[]; - blobIndexCoreKeys: Buffer[]; - blobCoreKeys: Buffer[]; } export interface HaveExtension { discoveryKey: Buffer; @@ -29,35 +24,15 @@ export declare const ProjectExtension: { encode(message: ProjectExtension, writer?: _m0.Writer): _m0.Writer; decode(input: _m0.Reader | Uint8Array, length?: number): ProjectExtension; create]: never; }; - authCoreKeys?: Buffer[] & Buffer[] & { [K_1 in Exclude]: never; }; - configCoreKeys?: Buffer[] & Buffer[] & { [K_2 in Exclude]: never; }; - dataCoreKeys?: Buffer[] & Buffer[] & { [K_3 in Exclude]: never; }; - blobIndexCoreKeys?: Buffer[] & Buffer[] & { [K_4 in Exclude]: never; }; - blobCoreKeys?: Buffer[] & Buffer[] & { [K_5 in Exclude]: never; }; - } & { [K_6 in Exclude]: never; }>(base?: I): ProjectExtension; + authCoreKeys?: Buffer[] & Buffer[] & { [K in Exclude]: never; }; + } & { [K_1 in Exclude]: never; }>(base?: I): ProjectExtension; fromPartial]: never; }; - authCoreKeys?: Buffer[] & Buffer[] & { [K_8 in Exclude]: never; }; - configCoreKeys?: Buffer[] & Buffer[] & { [K_9 in Exclude]: never; }; - dataCoreKeys?: Buffer[] & Buffer[] & { [K_10 in Exclude]: never; }; - blobIndexCoreKeys?: Buffer[] & Buffer[] & { [K_11 in Exclude]: never; }; - blobCoreKeys?: Buffer[] & Buffer[] & { [K_12 in Exclude]: never; }; - } & { [K_13 in Exclude]: never; }>(object: I_1): ProjectExtension; + authCoreKeys?: Buffer[] & Buffer[] & { [K_2 in Exclude]: never; }; + } & { [K_3 in Exclude]: never; }>(object: I_1): ProjectExtension; }; export declare const HaveExtension: { encode(message: HaveExtension, writer?: _m0.Writer): _m0.Writer; diff --git a/src/generated/extensions.js b/src/generated/extensions.js index c51e6dae7..7fe53088e 100644 --- a/src/generated/extensions.js +++ b/src/generated/extensions.js @@ -1,258 +1,222 @@ /* eslint-disable */ -import Long from "long"; -import _m0 from "protobufjs/minimal.js"; +import Long from 'long' +import _m0 from 'protobufjs/minimal.js' export var HaveExtension_Namespace = { - auth: "auth", - config: "config", - data: "data", - blobIndex: "blobIndex", - blob: "blob", - UNRECOGNIZED: "UNRECOGNIZED", -}; + auth: 'auth', + config: 'config', + data: 'data', + blobIndex: 'blobIndex', + blob: 'blob', + UNRECOGNIZED: 'UNRECOGNIZED', +} export function haveExtension_NamespaceFromJSON(object) { - switch (object) { - case 0: - case "auth": - return HaveExtension_Namespace.auth; - case 1: - case "config": - return HaveExtension_Namespace.config; - case 2: - case "data": - return HaveExtension_Namespace.data; - case 3: - case "blobIndex": - return HaveExtension_Namespace.blobIndex; - case 4: - case "blob": - return HaveExtension_Namespace.blob; - case -1: - case "UNRECOGNIZED": - default: - return HaveExtension_Namespace.UNRECOGNIZED; - } + switch (object) { + case 0: + case 'auth': + return HaveExtension_Namespace.auth + case 1: + case 'config': + return HaveExtension_Namespace.config + case 2: + case 'data': + return HaveExtension_Namespace.data + case 3: + case 'blobIndex': + return HaveExtension_Namespace.blobIndex + case 4: + case 'blob': + return HaveExtension_Namespace.blob + case -1: + case 'UNRECOGNIZED': + default: + return HaveExtension_Namespace.UNRECOGNIZED + } } export function haveExtension_NamespaceToNumber(object) { - switch (object) { - case HaveExtension_Namespace.auth: - return 0; - case HaveExtension_Namespace.config: - return 1; - case HaveExtension_Namespace.data: - return 2; - case HaveExtension_Namespace.blobIndex: - return 3; - case HaveExtension_Namespace.blob: - return 4; - case HaveExtension_Namespace.UNRECOGNIZED: - default: - return -1; - } + switch (object) { + case HaveExtension_Namespace.auth: + return 0 + case HaveExtension_Namespace.config: + return 1 + case HaveExtension_Namespace.data: + return 2 + case HaveExtension_Namespace.blobIndex: + return 3 + case HaveExtension_Namespace.blob: + return 4 + case HaveExtension_Namespace.UNRECOGNIZED: + default: + return -1 + } } function createBaseProjectExtension() { - return { - wantCoreKeys: [], - authCoreKeys: [], - configCoreKeys: [], - dataCoreKeys: [], - blobIndexCoreKeys: [], - blobCoreKeys: [], - }; + return { authCoreKeys: [] } } export var ProjectExtension = { - encode: function (message, writer) { - if (writer === void 0) { writer = _m0.Writer.create(); } - for (var _i = 0, _a = message.wantCoreKeys; _i < _a.length; _i++) { - var v = _a[_i]; - writer.uint32(10).bytes(v); - } - for (var _b = 0, _c = message.authCoreKeys; _b < _c.length; _b++) { - var v = _c[_b]; - writer.uint32(18).bytes(v); - } - for (var _d = 0, _e = message.configCoreKeys; _d < _e.length; _d++) { - var v = _e[_d]; - writer.uint32(26).bytes(v); - } - for (var _f = 0, _g = message.dataCoreKeys; _f < _g.length; _f++) { - var v = _g[_f]; - writer.uint32(34).bytes(v); - } - for (var _h = 0, _j = message.blobIndexCoreKeys; _h < _j.length; _h++) { - var v = _j[_h]; - writer.uint32(42).bytes(v); - } - for (var _k = 0, _l = message.blobCoreKeys; _k < _l.length; _k++) { - var v = _l[_k]; - writer.uint32(50).bytes(v); - } - return writer; - }, - decode: function (input, length) { - var reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); - var end = length === undefined ? reader.len : reader.pos + length; - var message = createBaseProjectExtension(); - while (reader.pos < end) { - var tag = reader.uint32(); - switch (tag >>> 3) { - case 1: - if (tag !== 10) { - break; - } - message.wantCoreKeys.push(reader.bytes()); - continue; - case 2: - if (tag !== 18) { - break; - } - message.authCoreKeys.push(reader.bytes()); - continue; - case 3: - if (tag !== 26) { - break; - } - message.configCoreKeys.push(reader.bytes()); - continue; - case 4: - if (tag !== 34) { - break; - } - message.dataCoreKeys.push(reader.bytes()); - continue; - case 5: - if (tag !== 42) { - break; - } - message.blobIndexCoreKeys.push(reader.bytes()); - continue; - case 6: - if (tag !== 50) { - break; - } - message.blobCoreKeys.push(reader.bytes()); - continue; - } - if ((tag & 7) === 4 || tag === 0) { - break; - } - reader.skipType(tag & 7); - } - return message; - }, - create: function (base) { - return ProjectExtension.fromPartial(base !== null && base !== void 0 ? base : {}); - }, - fromPartial: function (object) { - var _a, _b, _c, _d, _e, _f; - var message = createBaseProjectExtension(); - message.wantCoreKeys = ((_a = object.wantCoreKeys) === null || _a === void 0 ? void 0 : _a.map(function (e) { return e; })) || []; - message.authCoreKeys = ((_b = object.authCoreKeys) === null || _b === void 0 ? void 0 : _b.map(function (e) { return e; })) || []; - message.configCoreKeys = ((_c = object.configCoreKeys) === null || _c === void 0 ? void 0 : _c.map(function (e) { return e; })) || []; - message.dataCoreKeys = ((_d = object.dataCoreKeys) === null || _d === void 0 ? void 0 : _d.map(function (e) { return e; })) || []; - message.blobIndexCoreKeys = ((_e = object.blobIndexCoreKeys) === null || _e === void 0 ? void 0 : _e.map(function (e) { return e; })) || []; - message.blobCoreKeys = ((_f = object.blobCoreKeys) === null || _f === void 0 ? void 0 : _f.map(function (e) { return e; })) || []; - return message; - }, -}; + encode: function (message, writer) { + if (writer === void 0) { + writer = _m0.Writer.create() + } + for (var _i = 0, _a = message.authCoreKeys; _i < _a.length; _i++) { + var v = _a[_i] + writer.uint32(10).bytes(v) + } + return writer + }, + decode: function (input, length) { + var reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input) + var end = length === undefined ? reader.len : reader.pos + length + var message = createBaseProjectExtension() + while (reader.pos < end) { + var tag = reader.uint32() + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break + } + message.authCoreKeys.push(reader.bytes()) + continue + } + if ((tag & 7) === 4 || tag === 0) { + break + } + reader.skipType(tag & 7) + } + return message + }, + create: function (base) { + return ProjectExtension.fromPartial( + base !== null && base !== void 0 ? base : {} + ) + }, + fromPartial: function (object) { + var _a + var message = createBaseProjectExtension() + message.authCoreKeys = + ((_a = object.authCoreKeys) === null || _a === void 0 + ? void 0 + : _a.map(function (e) { + return e + })) || [] + return message + }, +} function createBaseHaveExtension() { - return { - discoveryKey: Buffer.alloc(0), - start: 0, - encodedBitfield: Buffer.alloc(0), - namespace: HaveExtension_Namespace.auth, - }; + return { + discoveryKey: Buffer.alloc(0), + start: 0, + encodedBitfield: Buffer.alloc(0), + namespace: HaveExtension_Namespace.auth, + } } export var HaveExtension = { - encode: function (message, writer) { - if (writer === void 0) { writer = _m0.Writer.create(); } - if (message.discoveryKey.length !== 0) { - writer.uint32(10).bytes(message.discoveryKey); - } - if (message.start !== 0) { - writer.uint32(16).uint64(message.start); - } - if (message.encodedBitfield.length !== 0) { - writer.uint32(26).bytes(message.encodedBitfield); - } - if (message.namespace !== HaveExtension_Namespace.auth) { - writer.uint32(32).int32(haveExtension_NamespaceToNumber(message.namespace)); - } - return writer; - }, - decode: function (input, length) { - var reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); - var end = length === undefined ? reader.len : reader.pos + length; - var message = createBaseHaveExtension(); - while (reader.pos < end) { - var tag = reader.uint32(); - switch (tag >>> 3) { - case 1: - if (tag !== 10) { - break; - } - message.discoveryKey = reader.bytes(); - continue; - case 2: - if (tag !== 16) { - break; - } - message.start = longToNumber(reader.uint64()); - continue; - case 3: - if (tag !== 26) { - break; - } - message.encodedBitfield = reader.bytes(); - continue; - case 4: - if (tag !== 32) { - break; - } - message.namespace = haveExtension_NamespaceFromJSON(reader.int32()); - continue; - } - if ((tag & 7) === 4 || tag === 0) { - break; - } - reader.skipType(tag & 7); - } - return message; - }, - create: function (base) { - return HaveExtension.fromPartial(base !== null && base !== void 0 ? base : {}); - }, - fromPartial: function (object) { - var _a, _b, _c, _d; - var message = createBaseHaveExtension(); - message.discoveryKey = (_a = object.discoveryKey) !== null && _a !== void 0 ? _a : Buffer.alloc(0); - message.start = (_b = object.start) !== null && _b !== void 0 ? _b : 0; - message.encodedBitfield = (_c = object.encodedBitfield) !== null && _c !== void 0 ? _c : Buffer.alloc(0); - message.namespace = (_d = object.namespace) !== null && _d !== void 0 ? _d : HaveExtension_Namespace.auth; - return message; - }, -}; -var tsProtoGlobalThis = (function () { - if (typeof globalThis !== "undefined") { - return globalThis; + encode: function (message, writer) { + if (writer === void 0) { + writer = _m0.Writer.create() } - if (typeof self !== "undefined") { - return self; + if (message.discoveryKey.length !== 0) { + writer.uint32(10).bytes(message.discoveryKey) } - if (typeof window !== "undefined") { - return window; + if (message.start !== 0) { + writer.uint32(16).uint64(message.start) } - if (typeof global !== "undefined") { - return global; + if (message.encodedBitfield.length !== 0) { + writer.uint32(26).bytes(message.encodedBitfield) } - throw "Unable to locate global object"; -})(); -function longToNumber(long) { - if (long.gt(Number.MAX_SAFE_INTEGER)) { - throw new tsProtoGlobalThis.Error("Value is larger than Number.MAX_SAFE_INTEGER"); + if (message.namespace !== HaveExtension_Namespace.auth) { + writer + .uint32(32) + .int32(haveExtension_NamespaceToNumber(message.namespace)) } - return long.toNumber(); + return writer + }, + decode: function (input, length) { + var reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input) + var end = length === undefined ? reader.len : reader.pos + length + var message = createBaseHaveExtension() + while (reader.pos < end) { + var tag = reader.uint32() + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break + } + message.discoveryKey = reader.bytes() + continue + case 2: + if (tag !== 16) { + break + } + message.start = longToNumber(reader.uint64()) + continue + case 3: + if (tag !== 26) { + break + } + message.encodedBitfield = reader.bytes() + continue + case 4: + if (tag !== 32) { + break + } + message.namespace = haveExtension_NamespaceFromJSON(reader.int32()) + continue + } + if ((tag & 7) === 4 || tag === 0) { + break + } + reader.skipType(tag & 7) + } + return message + }, + create: function (base) { + return HaveExtension.fromPartial( + base !== null && base !== void 0 ? base : {} + ) + }, + fromPartial: function (object) { + var _a, _b, _c, _d + var message = createBaseHaveExtension() + message.discoveryKey = + (_a = object.discoveryKey) !== null && _a !== void 0 + ? _a + : Buffer.alloc(0) + message.start = (_b = object.start) !== null && _b !== void 0 ? _b : 0 + message.encodedBitfield = + (_c = object.encodedBitfield) !== null && _c !== void 0 + ? _c + : Buffer.alloc(0) + message.namespace = + (_d = object.namespace) !== null && _d !== void 0 + ? _d + : HaveExtension_Namespace.auth + return message + }, +} +var tsProtoGlobalThis = (function () { + if (typeof globalThis !== 'undefined') { + return globalThis + } + if (typeof self !== 'undefined') { + return self + } + if (typeof window !== 'undefined') { + return window + } + if (typeof global !== 'undefined') { + return global + } + throw 'Unable to locate global object' +})() +function longToNumber(long) { + if (long.gt(Number.MAX_SAFE_INTEGER)) { + throw new tsProtoGlobalThis.Error( + 'Value is larger than Number.MAX_SAFE_INTEGER' + ) + } + return long.toNumber() } if (_m0.util.Long !== Long) { - _m0.util.Long = Long; - _m0.configure(); + _m0.util.Long = Long + _m0.configure() } diff --git a/src/generated/extensions.ts b/src/generated/extensions.ts index 6c3d45a00..eddb7efeb 100644 --- a/src/generated/extensions.ts +++ b/src/generated/extensions.ts @@ -3,12 +3,7 @@ import Long from "long"; import _m0 from "protobufjs/minimal.js"; export interface ProjectExtension { - wantCoreKeys: Buffer[]; authCoreKeys: Buffer[]; - configCoreKeys: Buffer[]; - dataCoreKeys: Buffer[]; - blobIndexCoreKeys: Buffer[]; - blobCoreKeys: Buffer[]; } export interface HaveExtension { @@ -72,35 +67,13 @@ export function haveExtension_NamespaceToNumber(object: HaveExtension_Namespace) } function createBaseProjectExtension(): ProjectExtension { - return { - wantCoreKeys: [], - authCoreKeys: [], - configCoreKeys: [], - dataCoreKeys: [], - blobIndexCoreKeys: [], - blobCoreKeys: [], - }; + return { authCoreKeys: [] }; } export const ProjectExtension = { encode(message: ProjectExtension, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { - for (const v of message.wantCoreKeys) { - writer.uint32(10).bytes(v!); - } for (const v of message.authCoreKeys) { - writer.uint32(18).bytes(v!); - } - for (const v of message.configCoreKeys) { - writer.uint32(26).bytes(v!); - } - for (const v of message.dataCoreKeys) { - writer.uint32(34).bytes(v!); - } - for (const v of message.blobIndexCoreKeys) { - writer.uint32(42).bytes(v!); - } - for (const v of message.blobCoreKeys) { - writer.uint32(50).bytes(v!); + writer.uint32(10).bytes(v!); } return writer; }, @@ -117,43 +90,8 @@ export const ProjectExtension = { break; } - message.wantCoreKeys.push(reader.bytes() as Buffer); - continue; - case 2: - if (tag !== 18) { - break; - } - message.authCoreKeys.push(reader.bytes() as Buffer); continue; - case 3: - if (tag !== 26) { - break; - } - - message.configCoreKeys.push(reader.bytes() as Buffer); - continue; - case 4: - if (tag !== 34) { - break; - } - - message.dataCoreKeys.push(reader.bytes() as Buffer); - continue; - case 5: - if (tag !== 42) { - break; - } - - message.blobIndexCoreKeys.push(reader.bytes() as Buffer); - continue; - case 6: - if (tag !== 50) { - break; - } - - message.blobCoreKeys.push(reader.bytes() as Buffer); - continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -168,12 +106,7 @@ export const ProjectExtension = { }, fromPartial, I>>(object: I): ProjectExtension { const message = createBaseProjectExtension(); - message.wantCoreKeys = object.wantCoreKeys?.map((e) => e) || []; message.authCoreKeys = object.authCoreKeys?.map((e) => e) || []; - message.configCoreKeys = object.configCoreKeys?.map((e) => e) || []; - message.dataCoreKeys = object.dataCoreKeys?.map((e) => e) || []; - message.blobIndexCoreKeys = object.blobIndexCoreKeys?.map((e) => e) || []; - message.blobCoreKeys = object.blobCoreKeys?.map((e) => e) || []; return message; }, }; diff --git a/src/index-writer/index.js b/src/index-writer/index.js index a48c5fde1..f7a857a09 100644 --- a/src/index-writer/index.js +++ b/src/index-writer/index.js @@ -4,6 +4,7 @@ import { getTableConfig } from 'drizzle-orm/sqlite-core' import { getBacklinkTableName } from '../schema/utils.js' import { discoveryKey } from 'hypercore-crypto' import { Logger } from '../logger.js' +import { TypedEmitter } from 'tiny-typed-emitter' /** * @typedef {import('../datatype/index.js').MapeoDocTables} MapeoDocTables @@ -14,6 +15,12 @@ import { Logger } from '../logger.js' /** * @typedef {ReturnType} MapeoDocInternal */ +/** + * @template {import('@mapeo/schema').MapeoDoc['schemaName']} TSchemaName + * @typedef {{ + * [S in TSchemaName]: (docs: Extract[]) => void + * }} SchemaEmitterEvents + */ /** * @template {MapeoDocTables} [TTables=MapeoDocTables] @@ -22,6 +29,8 @@ export class IndexWriter { /** @type {Map} */ #indexers = new Map() #mapDoc + /** @type {TypedEmitter> | undefined} */ + #schemaEmitter #l /** * @@ -53,6 +62,20 @@ export class IndexWriter { return [...this.#indexers.keys()] } + /** + * @template {keyof SchemaEmitterEvents} TSchemaName + * @param {{ schemaName: TSchemaName }} opts + * @param {SchemaEmitterEvents[TSchemaName]} onDocs + */ + subscribe({ schemaName }, onDocs) { + if (!this.#schemaEmitter) this.#schemaEmitter = new TypedEmitter() + const schemaEmitter = this.#schemaEmitter + schemaEmitter.on(schemaName, onDocs) + return function unsubscribe() { + schemaEmitter.off(schemaName, onDocs) + } + } + /** * * @param {import('multi-core-indexer').Entry[]} entries @@ -60,8 +83,8 @@ export class IndexWriter { async batch(entries) { // sqlite-indexer is _significantly_ faster when batching even <10 at a // time, so best to queue docs here before calling sliteIndexer.batch() - /** @type {Record} */ - const queued = {} + /** @type {Map} */ + const queued = new Map() for (const { block, key, index } of entries) { try { const version = { coreDiscoveryKey: discoveryKey(key), index } @@ -71,19 +94,24 @@ export class IndexWriter { // Unknown or invalid entry - silently ignore continue } - if (queued[doc.schemaName]) { - queued[doc.schemaName].push(doc) + if (!this.#indexers.has(doc.schemaName)) continue + const queue = queued.get(doc.schemaName) + if (queue) { + queue.push(doc) } else { - queued[doc.schemaName] = [doc] + queued.set(doc.schemaName, [doc]) } } - for (const [schemaName, docs] of Object.entries(queued)) { - // @ts-expect-error + for (const [schemaName, docs] of queued.entries()) { const indexer = this.#indexers.get(schemaName) if (!indexer) { // Don't have an indexer for this type - silently ignore continue } + if (this.#schemaEmitter) { + // @ts-ignore + this.#schemaEmitter.emit(schemaName, docs) + } indexer.batch(docs) if (this.#l.enabled) { for (const doc of docs) { diff --git a/src/logger.js b/src/logger.js index 80de7b9db..72d9f5b96 100644 --- a/src/logger.js +++ b/src/logger.js @@ -59,6 +59,7 @@ export class Logger { static create(ns, logger) { if (logger) return logger.extend(ns) const i = (counts.get(ns) || 0) + 1 + counts.set(ns, i) const deviceId = String(i).padStart(TRIM, '0') return new Logger({ deviceId, ns }) } diff --git a/src/mapeo-project.js b/src/mapeo-project.js index 335af9bdf..1bb64cb74 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -283,6 +283,8 @@ export class MapeoProject { this.#syncApi = new SyncApi({ coreManager: this.#coreManager, capabilities: this.#capabilities, + coreOwnership: this.#coreOwnership, + indexWriter, logger: this.#l, }) diff --git a/src/sync/peer-sync-controller.js b/src/sync/peer-sync-controller.js index d1d55aa09..87078d9dc 100644 --- a/src/sync/peer-sync-controller.js +++ b/src/sync/peer-sync-controller.js @@ -12,12 +12,15 @@ import { Logger } from '../logger.js' /** @type {Namespace[]} */ export const PRESYNC_NAMESPACES = ['auth', 'config', 'blobIndex'] +// After at least one auth core is replicating, wait this long before sending haves to allow other auth cores to see if +const SEND_KEYS_WAIT_MS = 1000 + export class PeerSyncController { #replicatingCores = new Set() /** @type {Set} */ #enabledNamespaces = new Set() #coreManager - #protomux + #creatorCorePeer #capabilities /** @type {Record} */ #syncCapability = createNamespaceMap('unknown') @@ -26,33 +29,42 @@ export class PeerSyncController { #prevLocalState = createNamespaceMap(null) /** @type {SyncStatus} */ #syncStatus = createNamespaceMap('unknown') - /** @type {Map, ReturnType>} */ - #downloadingRanges = new Map() /** @type {SyncStatus} */ #prevSyncStatus = createNamespaceMap('unknown') + /** @type {NodeJS.Timeout | undefined } */ + #sentKeysTimeoutId + #hasSentHaves = createNamespaceMap(false) #log + #syncState /** * @param {object} opts - * @param {import("protomux")} opts.protomux + * @param {import('./sync-api.js').Peer} opts.creatorCorePeer * @param {import("../core-manager/index.js").CoreManager} opts.coreManager * @param {import("./sync-state.js").SyncState} opts.syncState * @param {import("../capabilities.js").Capabilities} opts.capabilities * @param {Logger} [opts.logger] */ - constructor({ protomux, coreManager, syncState, capabilities, logger }) { + constructor({ + creatorCorePeer, + coreManager, + syncState, + capabilities, + logger, + }) { // @ts-ignore this.#log = (formatter, ...args) => { const log = Logger.create('peer', logger).log return log.apply(null, [ `[%h] ${formatter}`, - protomux.stream.remotePublicKey, + creatorCorePeer.remotePublicKey, ...args, ]) } this.#coreManager = coreManager - this.#protomux = protomux + this.#creatorCorePeer = creatorCorePeer this.#capabilities = capabilities + this.#syncState = syncState // Always need to replicate the project creator core this.#replicateCore(coreManager.creatorCore) @@ -64,7 +76,7 @@ export class PeerSyncController { } get peerKey() { - return this.#protomux.stream.remotePublicKey + return this.#creatorCorePeer.remotePublicKey } get peerId() { @@ -99,24 +111,26 @@ export class PeerSyncController { */ handleDiscoveryKey(discoveryKey) { const coreRecord = this.#coreManager.getCoreByDiscoveryKey(discoveryKey) - // If we already know about this core, then we will add it to the - // replication stream when we are ready - if (coreRecord) { - this.#log( - 'Received discovery key %h, but already have core in namespace %s', - discoveryKey, - coreRecord.namespace - ) - if (this.#enabledNamespaces.has(coreRecord.namespace)) { - this.#replicateCore(coreRecord.core) - } - return + // If we don't have the core record, we'll add and replicate it when we + // receive the core key via an extension or from a core ownership record. + if (!coreRecord) return + + this.#log( + 'Received discovery key %h, but already have core in namespace %s', + discoveryKey, + coreRecord.namespace + ) + if (this.#enabledNamespaces.has(coreRecord.namespace)) { + this.#replicateCore(coreRecord.core) } - if (!this.peerKey) { - this.#log('Unexpected null peerKey') - return + } + + destroy() { + if (this.#sentKeysTimeoutId) { + clearTimeout(this.#sentKeysTimeoutId) } - this.#coreManager.requestCoreKey(this.peerKey, discoveryKey) + this.#coreManager.off('add-core', this.#handleAddCore) + this.#syncState.off('state', this.#handleStateChange) } /** @@ -137,11 +151,17 @@ export class PeerSyncController { * @param {import("./sync-state.js").State} state */ #handleStateChange = async (state) => { - // The remotePublicKey is only available after the noise stream has - // connected. We shouldn't get a state change before the noise stream has - // connected, but if we do we can ignore it because we won't have any useful - // information until it connects. - if (!this.peerId) return + // Once we are replicating with at least one auth core, wait a bit for other + // auth cores to replicate, then send the keys for any missing auth cores to + // the peer. + if ( + !this.#sentKeysTimeoutId && + Object.keys(state.auth.remoteStates).length > 0 + ) { + this.#sentKeysTimeoutId = setTimeout(() => { + this.#coreManager.sendAuthCoreKeys(this.#creatorCorePeer) + }, SEND_KEYS_WAIT_MS) + } this.#syncStatus = getSyncStatus(this.peerId, state) const localState = mapObject(state, (ns, nsState) => { return [ns, nsState.localState] @@ -176,9 +196,18 @@ export class PeerSyncController { } this.#log('capability %o', this.#syncCapability) - // If any namespace has new data, update what is enabled - if (Object.values(didUpdate).indexOf(true) > -1) { - this.#updateEnabledNamespaces() + // Stop here if no updates + if (Object.values(didUpdate).indexOf(true) === -1) return + + this.#updateEnabledNamespaces() + + // Send pre-haves for any namespaces that the peer is allowed to sync + for (const ns of NAMESPACES) { + if (ns === 'auth') continue + if (this.#hasSentHaves[ns]) continue + if (this.#syncCapability[ns] !== 'allowed') continue + this.#coreManager.sendHaves(this.#creatorCorePeer, ns) + this.#hasSentHaves[ns] = true } } @@ -225,7 +254,7 @@ export class PeerSyncController { #replicateCore(core) { if (this.#replicatingCores.has(core)) return this.#log('replicating core %k', core.key) - core.replicate(this.#protomux) + core.replicate(this.#creatorCorePeer.protomux) core.on('peer-remove', (peer) => { if (!peer.remotePublicKey.equals(this.peerKey)) return this.#log('peer-remove %h from core %k', peer.remotePublicKey, core.key) @@ -239,7 +268,7 @@ export class PeerSyncController { #unreplicateCore(core) { if (core === this.#coreManager.creatorCore) return const peerToUnreplicate = core.peers.find( - (peer) => peer.protomux === this.#protomux + (peer) => peer.protomux === this.#creatorCorePeer.protomux ) if (!peerToUnreplicate) return this.#log('unreplicating core %k', core.key) diff --git a/src/sync/sync-api.js b/src/sync/sync-api.js index 018de6e9b..977be7234 100644 --- a/src/sync/sync-api.js +++ b/src/sync/sync-api.js @@ -10,6 +10,13 @@ import { keyToId } from '../utils.js' export const kHandleDiscoveryKey = Symbol('handle discovery key') +/** + * @typedef {{ + * protomux: import('protomux'), + * remotePublicKey: Buffer + * }} Peer + */ + /** * @typedef {object} SyncEvents * @property {(syncState: import('./sync-state.js').State) => void} sync-state @@ -31,26 +38,42 @@ export class SyncApi extends TypedEmitter { /** @type {Map>} */ #pendingDiscoveryKeys = new Map() #l + #coreOwnership /** * * @param {object} opts * @param {import('../core-manager/index.js').CoreManager} opts.coreManager * @param {import("../capabilities.js").Capabilities} opts.capabilities + * @param {import('../core-ownership.js').CoreOwnership} opts.coreOwnership + * @param {import('../index-writer/index.js').IndexWriter} opts.indexWriter * @param {number} [opts.throttleMs] * @param {Logger} [opts.logger] */ - constructor({ coreManager, throttleMs = 200, capabilities, logger }) { + constructor({ + coreManager, + capabilities, + coreOwnership, + indexWriter, + throttleMs = 200, + logger, + }) { super() this.#l = Logger.create('syncApi', logger) this.#coreManager = coreManager this.#capabilities = capabilities + this.#coreOwnership = coreOwnership this.syncState = new SyncState({ coreManager, throttleMs }) this.syncState.setMaxListeners(0) this.syncState.on('state', this.emit.bind(this, 'sync-state')) this.#coreManager.creatorCore.on('peer-add', this.#handlePeerAdd) this.#coreManager.creatorCore.on('peer-remove', this.#handlePeerRemove) + indexWriter.subscribe({ schemaName: 'role' }, this.#handleRoleUpdate) + indexWriter.subscribe( + { schemaName: 'coreOwnership' }, + this.#handleCoreOwnershipUpdate + ) } /** @type {import('../local-peers.js').LocalPeersEvents['discovery-key']} */ @@ -133,19 +156,19 @@ export class SyncApi extends TypedEmitter { * will then handle validation of role records to ensure that the peer is * actually still part of the project. * - * @param {{ protomux: import('protomux') }} peer + * @param {Peer} peer */ #handlePeerAdd = (peer) => { const { protomux } = peer if (this.#peerSyncControllers.has(protomux)) { this.#l.log( 'Unexpected existing peer sync controller for peer %h', - protomux.stream.remotePublicKey + peer.remotePublicKey ) return } const peerSyncController = new PeerSyncController({ - protomux, + creatorCorePeer: peer, coreManager: this.#coreManager, syncState: this.syncState, capabilities: this.#capabilities, @@ -173,21 +196,67 @@ export class SyncApi extends TypedEmitter { * Called when a peer is removed from the creator core, e.g. when the * connection is terminated. * - * @param {{ protomux: import('protomux'), remotePublicKey: Buffer }} peer + * @param {Peer} peer */ #handlePeerRemove = (peer) => { const { protomux } = peer - if (!this.#peerSyncControllers.has(protomux)) { + const psc = this.#peerSyncControllers.get(protomux) + if (!psc) { this.#l.log( 'Unexpected no existing peer sync controller for peer %h', protomux.stream.remotePublicKey ) return } + psc.destroy() this.#peerSyncControllers.delete(protomux) this.#peerIds.delete(keyToId(peer.remotePublicKey)) this.#pendingDiscoveryKeys.delete(protomux) } + + /** + * @param {import('@mapeo/schema').Role[]} roleDocs + */ + async #handleRoleUpdate(roleDocs) { + // We add cores for any device that has a role doc, even if blocked + for (const roleDoc of roleDocs) { + try { + this.#addCores(await this.#coreOwnership.get(roleDoc.docId)) + this.#l.log('Added cores for device %S', roleDoc.docId) + } catch (e) { + // Ignore, we'll add these when the coreOwnership is added + this.#l.log('No coreOwnership for device %S', roleDoc.docId) + } + } + } + + /** + * @param {import('@mapeo/schema').CoreOwnership[]} coreOwnershipDocs + */ + async #handleCoreOwnershipUpdate(coreOwnershipDocs) { + for (const coreOwnershipDoc of coreOwnershipDocs) { + try { + // We don't actually need the role, we just need to check if it exists + await this.#capabilities.getCapabilities(coreOwnershipDoc.docId) + this.#addCores(coreOwnershipDoc) + this.#l.log('Added cores for device %S', coreOwnershipDoc.docId) + } catch (e) { + // Ignore, we'll add these when the role is added + this.#l.log('No role for device %S', coreOwnershipDoc.docId) + } + } + } + + /** + * @param {import('@mapeo/schema').CoreOwnership} coreOwnership + */ + #addCores(coreOwnership) { + for (const ns of NAMESPACES) { + if (ns === 'auth') continue + const coreKey = Buffer.from(coreOwnership[`${ns}CoreId`], 'hex') + this.#coreManager.addCore(coreKey, ns) + } + } } /** diff --git a/test-e2e/sync.js b/test-e2e/sync.js index ffd9fe6f0..3fd1ada5f 100644 --- a/test-e2e/sync.js +++ b/test-e2e/sync.js @@ -190,71 +190,65 @@ test('shares cores', async function (t) { } }) -test('no sync capabilities === no namespaces sync apart from auth', async (t) => { - const COUNT = 3 - const managers = await createManagers(COUNT, t) - const [invitor, invitee, blocked] = managers - const disconnect1 = connectPeers(managers, { discovery: false }) - const projectId = await invitor.createProject() - await invite({ - invitor, - invitees: [blocked], - projectId, - roleId: BLOCKED_ROLE_ID, - }) - await invite({ - invitor, - invitees: [invitee], - projectId, - roleId: COORDINATOR_ROLE_ID, - }) - - const projects = await Promise.all( - managers.map((m) => m.getProject(projectId)) - ) - const [invitorProject, inviteeProject] = projects +test.solo( + 'no sync capabilities === no namespaces sync apart from auth', + async (t) => { + const COUNT = 3 + const managers = await createManagers(COUNT, t) + const [invitor, invitee, blocked] = managers + const disconnect1 = connectPeers(managers, { discovery: false }) + const projectId = await invitor.createProject() + await invite({ + invitor, + invitees: [blocked], + projectId, + roleId: BLOCKED_ROLE_ID, + }) + await invite({ + invitor, + invitees: [invitee], + projectId, + roleId: COORDINATOR_ROLE_ID, + }) + + const projects = await Promise.all( + managers.map((m) => m.getProject(projectId)) + ) + const [invitorProject, inviteeProject] = projects + + const generatedDocs = (await seedDatabases([inviteeProject])).flat() + const configDocsCount = generatedDocs.filter( + (doc) => doc.schemaName !== 'observation' + ).length + const dataDocsCount = generatedDocs.length - configDocsCount - const generatedDocs = (await seedDatabases([inviteeProject])).flat() - const configDocsCount = generatedDocs.filter( - (doc) => doc.schemaName !== 'observation' - ).length - const dataDocsCount = generatedDocs.length - configDocsCount + for (const project of projects) { + project.$sync.start() + } - for (const project of projects) { - project.$sync.start() - } + await waitForSync([inviteeProject, invitorProject], 'full') - await waitForSync([inviteeProject, invitorProject], 'full') + const [invitorState, inviteeState, blockedState] = projects.map((p) => + p.$sync.getState() + ) - const [invitorState, inviteeState, blockedState] = projects.map((p) => - p.$sync.getState() - ) + t.is(invitorState.config.localState.have, configDocsCount + COUNT) // count device info doc for each invited device + t.is(invitorState.data.localState.have, dataDocsCount) + console.dir(blockedState, { depth: null }) + t.is(blockedState.config.localState.have, 1) // just the device info doc + t.is(blockedState.data.localState.have, 0) // no data docs synced - t.is(invitorState.config.localState.have, configDocsCount + COUNT) // count device info doc for each invited device - t.is(invitorState.data.localState.have, dataDocsCount) - t.is(blockedState.config.localState.have, 1) // just the device info doc - t.is(blockedState.data.localState.have, 0) // no data docs synced + // Temp fix until we have .close() method - waits for indexing idle to ensure + // we don't close storage in teardown while index is still being written. + await Promise.all(projects.map((p) => p.$getProjectSettings())) - for (const ns of NAMESPACES) { - if (ns === 'auth') { - t.is(invitorState[ns].coreCount, 3) - t.is(inviteeState[ns].coreCount, 3) - t.is(blockedState[ns].coreCount, 3) - } else if (PRESYNC_NAMESPACES.includes(ns)) { + for (const ns of NAMESPACES) { t.is(invitorState[ns].coreCount, 3) t.is(inviteeState[ns].coreCount, 3) - t.is(blockedState[ns].coreCount, 1) - } else { - t.is(invitorState[ns].coreCount, 2) - t.is(inviteeState[ns].coreCount, 2) - t.is(blockedState[ns].coreCount, 1) + t.is(blockedState[ns].coreCount, 3, ns) + t.alike(invitorState[ns].localState, inviteeState[ns].localState) } - t.alike(invitorState[ns].localState, inviteeState[ns].localState) - } - - // Temp fix until we have .close() method - waits for indexing idle to ensure - // we don't close storage in teardown while index is still being written. - await Promise.all(projects.map((p) => p.$getProjectSettings())) - await disconnect1() -}) + await disconnect1() + } +) diff --git a/tests/datastore.js b/tests/datastore.js index cfde7a65f..e652a03cd 100644 --- a/tests/datastore.js +++ b/tests/datastore.js @@ -91,12 +91,12 @@ test('index events', async (t) => { }, storage: () => new RAM(), }) - dataStore.on('index-state', (state) => { + dataStore.indexer.on('index-state', (state) => { // eslint-disable-next-line no-unused-vars const { entriesPerSecond, ...rest } = state indexStates.push(rest) }) - const idlePromise = once(dataStore, 'idle') + const idlePromise = once(dataStore.indexer, 'idle') await dataStore.write(obs) await idlePromise const expectedStates = [