diff --git a/proto/extensions.proto b/proto/extensions.proto index 344b334b4..7bab5f61e 100644 --- a/proto/extensions.proto +++ b/proto/extensions.proto @@ -1,13 +1,7 @@ syntax = "proto3"; message ProjectExtension { - repeated bytes wantCoreKeys = 1; - - repeated bytes authCoreKeys = 2; - repeated bytes configCoreKeys = 3; - repeated bytes dataCoreKeys = 4; - repeated bytes blobIndexCoreKeys = 5; - repeated bytes blobCoreKeys = 6; + repeated bytes authCoreKeys = 1; } message HaveExtension { diff --git a/src/core-manager/index.js b/src/core-manager/index.js index 86d58a6c6..68c9a3261 100644 --- a/src/core-manager/index.js +++ b/src/core-manager/index.js @@ -3,17 +3,16 @@ import Corestore from 'corestore' import { debounce } from 'throttle-debounce' import assert from 'node:assert/strict' import { sql, eq } from 'drizzle-orm' -import { discoveryKey } from 'hypercore-crypto' -import Hypercore from 'hypercore' import { HaveExtension, ProjectExtension } from '../generated/extensions.js' import { Logger } from '../logger.js' import { NAMESPACES } from '../constants.js' -import { keyToId, noop } from '../utils.js' +import { noop } from '../utils.js' import { coresTable } from '../schema/project.js' import * as rle from './bitfield-rle.js' import { CoreIndex } from './core-index.js' +/** @import Hypercore from 'hypercore' */ /** @import { HypercorePeer, Namespace } from '../types.js' */ const WRITER_CORE_PREHAVES_DEBOUNCE_DELAY = 1000 @@ -46,12 +45,6 @@ export class CoreManager extends TypedEmitter { #haveExtension #deviceId #l - /** - * We use this to reduce network traffic caused by requesting the same key - * from multiple clients. - * TODO: Remove items from this set after a max age - */ - #keyRequests = new TrackedKeyRequests() #autoDownload static get namespaces() { @@ -152,8 +145,8 @@ export class CoreManager extends TypedEmitter { 'mapeo/project', { encoding: ProjectExtensionCodec, - onmessage: (msg, peer) => { - this.#handleProjectMessage(msg, peer) + onmessage: (msg) => { + this.#handleProjectMessage(msg) }, } ) @@ -169,12 +162,7 @@ export class CoreManager extends TypedEmitter { this.#sendHaves(peer, this.#coreIndex).catch(() => { this.#l.log('Failed to send pre-haves to newly-connected peer') }) - }) - this.#creatorCore.on('peer-remove', (peer) => { - // When a peer is removed we clean up any unanswered key requests, so that - // we will request from a different peer, and to avoid the tracking of key - // requests growing without bounds. - this.#keyRequests.deleteByPeerKey(peer.remotePublicKey) + this.#sendAuthCoreKeys(peer) }) this.#ready = Promise.all( @@ -251,7 +239,6 @@ export class CoreManager extends TypedEmitter { */ async close() { this.#state = 'closing' - this.#keyRequests.clear() const promises = [] for (const { core } of this.#coreIndex) { promises.push(core.close()) @@ -268,6 +255,7 @@ export class CoreManager extends TypedEmitter { * @returns {CoreRecord} */ addCore(key, namespace) { + this.#l.log('Adding remote core %k to %s', key, namespace) return this.#addCore({ publicKey: key }, namespace, true) } @@ -359,69 +347,31 @@ export class CoreManager extends TypedEmitter { } /** - * Send an extension message over the project creator core replication stream - * requesting a core key for the given discovery key. + * We only add auth cores from the project extension messages. Cores in other + * namespaces are added by the sync API from the core ownership docs * - * @param {Buffer} peerKey - * @param {Buffer} discoveryKey + * @param {ProjectExtension} msg */ - requestCoreKey(peerKey, discoveryKey) { - // No-op if we already have this core - if (this.getCoreByDiscoveryKey(discoveryKey)) return - const peer = this.#creatorCore.peers.find((peer) => { - return peer.remotePublicKey.equals(peerKey) - }) - if (!peer) { - // This should not happen because this is only called from SyncApi, which - // checks the peer exists before calling this method. - this.#l.log( - 'Attempted to request core key for %h, but no connected peer %h', - discoveryKey, - peerKey - ) - return + #handleProjectMessage({ authCoreKeys }) { + for (const authCoreKey of authCoreKeys) { + // Use public method - these must be persisted (private method defaults to persisted=false) + this.addCore(authCoreKey, 'auth') } - // Only request a key once, e.g. from the peer we first receive it from (we - // can assume that a peer must have the key if we see the discovery key in - // the protomux). This is necessary to reduce network traffic for many newly - // connected peers - otherwise duplicate requests will be sent to every peer - if (this.#keyRequests.has(discoveryKey)) return - this.#keyRequests.set(discoveryKey, peerKey) - - this.#l.log( - 'Requesting core key for discovery key %h from peer %h', - discoveryKey, - peerKey - ) - const message = ProjectExtension.fromPartial({ - wantCoreKeys: [discoveryKey], - }) - this.#projectExtension.send(message, peer) } /** - * @param {ProjectExtension} msg + * Sends auth core keys to the given peer, skipping any keys that we know the + * peer has already (depends on the peer having already replicated the auth + * cores it has) + * * @param {HypercorePeer} peer */ - #handleProjectMessage({ wantCoreKeys, ...coreKeys }, peer) { + #sendAuthCoreKeys(peer) { const message = ProjectExtension.create() - let hasKeys = false - for (const discoveryKey of wantCoreKeys) { - const coreRecord = this.getCoreByDiscoveryKey(discoveryKey) - if (!coreRecord) continue - message[`${coreRecord.namespace}CoreKeys`].push(coreRecord.key) - hasKeys = true - } - if (hasKeys) { - this.#projectExtension.send(message, peer) - } - for (const namespace of NAMESPACES) { - for (const coreKey of coreKeys[`${namespace}CoreKeys`]) { - // Use public method - these must be persisted (private method defaults to persisted=false) - this.addCore(coreKey, namespace) - this.#keyRequests.deleteByDiscoveryKey(discoveryKey(coreKey)) - } + for (const { key } of this.getCores('auth')) { + message.authCoreKeys.push(key) } + this.#projectExtension.send(message, peer) } /** @@ -478,21 +428,14 @@ export class CoreManager extends TypedEmitter { * ONLY FOR TESTING * Replicate all cores in core manager * + * NB: Remote peers need to be manually added when unit testing core manager + * without the Sync API + * * @param {Parameters[0]} stream */ [kCoreManagerReplicate](stream) { - const protocolStream = Hypercore.createProtocolStream(stream, { - ondiscoverykey: async (discoveryKey) => { - const peer = await findPeer( - this.creatorCore, - // @ts-ignore - protocolStream.noiseStream.remotePublicKey - ) - if (!peer) return - this.requestCoreKey(peer.remotePublicKey, discoveryKey) - }, - }) - return this.#corestore.replicate(stream) + const protocolStream = this.#corestore.replicate(stream) + return protocolStream } /** @@ -559,93 +502,3 @@ const HaveExtensionCodec = { } }, } - -class TrackedKeyRequests { - /** @type {Map} */ - #byDiscoveryId = new Map() - /** @type {Map>} */ - #byPeerId = new Map() - - /** - * @param {Buffer} discoveryKey - * @param {Buffer} peerKey - */ - set(discoveryKey, peerKey) { - const discoveryId = keyToId(discoveryKey) - const peerId = keyToId(peerKey) - const existingForPeer = this.#byPeerId.get(peerId) || new Set() - this.#byDiscoveryId.set(discoveryId, peerId) - existingForPeer.add(discoveryId) - this.#byPeerId.set(peerId, existingForPeer) - return this - } - /** - * @param {Buffer} discoveryKey - */ - has(discoveryKey) { - const discoveryId = keyToId(discoveryKey) - return this.#byDiscoveryId.has(discoveryId) - } - /** - * @param {Buffer} discoveryKey - */ - deleteByDiscoveryKey(discoveryKey) { - const discoveryId = keyToId(discoveryKey) - const peerId = this.#byDiscoveryId.get(discoveryId) - if (!peerId) return false - this.#byDiscoveryId.delete(discoveryId) - const existingForPeer = this.#byPeerId.get(peerId) - if (existingForPeer) { - existingForPeer.delete(discoveryId) - } - return true - } - /** - * @param {Buffer} peerKey - */ - deleteByPeerKey(peerKey) { - const peerId = keyToId(peerKey) - const existingForPeer = this.#byPeerId.get(peerId) - if (!existingForPeer) return - for (const discoveryId of existingForPeer) { - this.#byDiscoveryId.delete(discoveryId) - } - this.#byPeerId.delete(peerId) - } - clear() { - this.#byDiscoveryId.clear() - this.#byPeerId.clear() - } -} - -/** - * @param {Hypercore<"binary", Buffer>} core - * @param {Buffer} publicKey - * @param {{ timeout?: number }} [opts] - */ -function findPeer(core, publicKey, { timeout = 200 } = {}) { - const peer = core.peers.find((peer) => { - return peer.remotePublicKey.equals(publicKey) - }) - if (peer) return peer - // This is called from the from the handleDiscoveryId event, which can - // happen before the peer connection is fully established, so we wait for - // the `peer-add` event, with a timeout in case the peer never gets added - return new Promise(function (res) { - const timeoutId = setTimeout(function () { - core.off('peer-add', onPeer) - res(null) - }, timeout) - - core.on('peer-add', onPeer) - - /** @param {HypercorePeer} peer */ - function onPeer(peer) { - if (peer.remotePublicKey.equals(publicKey)) { - clearTimeout(timeoutId) - core.off('peer-add', onPeer) - res(peer) - } - } - }) -} diff --git a/src/core-ownership.js b/src/core-ownership.js index 5d74432c7..f445d2fc2 100644 --- a/src/core-ownership.js +++ b/src/core-ownership.js @@ -116,6 +116,11 @@ export class CoreOwnership extends TypedEmitter { return this.#dataType.getByDocId(deviceId) } + async getAll() { + await this.#ownershipWriteDone + return this.#dataType.getMany() + } + /** * * @param {KeyPair} identityKeypair diff --git a/src/generated/extensions.d.ts b/src/generated/extensions.d.ts index 43d4b4c46..f8841e2ad 100644 --- a/src/generated/extensions.d.ts +++ b/src/generated/extensions.d.ts @@ -1,12 +1,6 @@ -/// import _m0 from "protobufjs/minimal.js"; export interface ProjectExtension { - wantCoreKeys: Buffer[]; authCoreKeys: Buffer[]; - configCoreKeys: Buffer[]; - dataCoreKeys: Buffer[]; - blobIndexCoreKeys: Buffer[]; - blobCoreKeys: Buffer[]; } export interface HaveExtension { discoveryKey: Buffer; @@ -28,60 +22,23 @@ export declare function haveExtension_NamespaceToNumber(object: HaveExtension_Na export declare const ProjectExtension: { encode(message: ProjectExtension, writer?: _m0.Writer): _m0.Writer; decode(input: _m0.Reader | Uint8Array, length?: number): ProjectExtension; - create]: never; }; - authCoreKeys?: Buffer[] & Buffer[] & { [K_1 in Exclude]: never; }; - configCoreKeys?: Buffer[] & Buffer[] & { [K_2 in Exclude]: never; }; - dataCoreKeys?: Buffer[] & Buffer[] & { [K_3 in Exclude]: never; }; - blobIndexCoreKeys?: Buffer[] & Buffer[] & { [K_4 in Exclude]: never; }; - blobCoreKeys?: Buffer[] & Buffer[] & { [K_5 in Exclude]: never; }; - } & { [K_6 in Exclude]: never; }>(base?: I): ProjectExtension; - fromPartial]: never; }; - authCoreKeys?: Buffer[] & Buffer[] & { [K_8 in Exclude]: never; }; - configCoreKeys?: Buffer[] & Buffer[] & { [K_9 in Exclude]: never; }; - dataCoreKeys?: Buffer[] & Buffer[] & { [K_10 in Exclude]: never; }; - blobIndexCoreKeys?: Buffer[] & Buffer[] & { [K_11 in Exclude]: never; }; - blobCoreKeys?: Buffer[] & Buffer[] & { [K_12 in Exclude]: never; }; - } & { [K_13 in Exclude]: never; }>(object: I_1): ProjectExtension; + create, I>>(base?: I): ProjectExtension; + fromPartial, I>>(object: I): ProjectExtension; }; export declare const HaveExtension: { encode(message: HaveExtension, writer?: _m0.Writer): _m0.Writer; decode(input: _m0.Reader | Uint8Array, length?: number): HaveExtension; - create]: never; }>(base?: I): HaveExtension; - fromPartial]: never; }>(object: I_1): HaveExtension; + create, I>>(base?: I): HaveExtension; + fromPartial, I>>(object: I): HaveExtension; }; +type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined; +type DeepPartial = T extends Builtin ? T : T extends Array ? Array> : T extends ReadonlyArray ? ReadonlyArray> : T extends {} ? { + [K in keyof T]?: DeepPartial; +} : Partial; +type KeysOfUnion = T extends T ? keyof T : never; +type Exact = P extends Builtin ? P : P & { + [K in keyof P]: Exact; +} & { + [K in Exclude>]: never; +}; +export {}; diff --git a/src/generated/extensions.js b/src/generated/extensions.js index c51e6dae7..088e1cc22 100644 --- a/src/generated/extensions.js +++ b/src/generated/extensions.js @@ -50,42 +50,15 @@ export function haveExtension_NamespaceToNumber(object) { } } function createBaseProjectExtension() { - return { - wantCoreKeys: [], - authCoreKeys: [], - configCoreKeys: [], - dataCoreKeys: [], - blobIndexCoreKeys: [], - blobCoreKeys: [], - }; + return { authCoreKeys: [] }; } export var ProjectExtension = { encode: function (message, writer) { if (writer === void 0) { writer = _m0.Writer.create(); } - for (var _i = 0, _a = message.wantCoreKeys; _i < _a.length; _i++) { + for (var _i = 0, _a = message.authCoreKeys; _i < _a.length; _i++) { var v = _a[_i]; writer.uint32(10).bytes(v); } - for (var _b = 0, _c = message.authCoreKeys; _b < _c.length; _b++) { - var v = _c[_b]; - writer.uint32(18).bytes(v); - } - for (var _d = 0, _e = message.configCoreKeys; _d < _e.length; _d++) { - var v = _e[_d]; - writer.uint32(26).bytes(v); - } - for (var _f = 0, _g = message.dataCoreKeys; _f < _g.length; _f++) { - var v = _g[_f]; - writer.uint32(34).bytes(v); - } - for (var _h = 0, _j = message.blobIndexCoreKeys; _h < _j.length; _h++) { - var v = _j[_h]; - writer.uint32(42).bytes(v); - } - for (var _k = 0, _l = message.blobCoreKeys; _k < _l.length; _k++) { - var v = _l[_k]; - writer.uint32(50).bytes(v); - } return writer; }, decode: function (input, length) { @@ -99,38 +72,8 @@ export var ProjectExtension = { if (tag !== 10) { break; } - message.wantCoreKeys.push(reader.bytes()); - continue; - case 2: - if (tag !== 18) { - break; - } message.authCoreKeys.push(reader.bytes()); continue; - case 3: - if (tag !== 26) { - break; - } - message.configCoreKeys.push(reader.bytes()); - continue; - case 4: - if (tag !== 34) { - break; - } - message.dataCoreKeys.push(reader.bytes()); - continue; - case 5: - if (tag !== 42) { - break; - } - message.blobIndexCoreKeys.push(reader.bytes()); - continue; - case 6: - if (tag !== 50) { - break; - } - message.blobCoreKeys.push(reader.bytes()); - continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -143,14 +86,9 @@ export var ProjectExtension = { return ProjectExtension.fromPartial(base !== null && base !== void 0 ? base : {}); }, fromPartial: function (object) { - var _a, _b, _c, _d, _e, _f; + var _a; var message = createBaseProjectExtension(); - message.wantCoreKeys = ((_a = object.wantCoreKeys) === null || _a === void 0 ? void 0 : _a.map(function (e) { return e; })) || []; - message.authCoreKeys = ((_b = object.authCoreKeys) === null || _b === void 0 ? void 0 : _b.map(function (e) { return e; })) || []; - message.configCoreKeys = ((_c = object.configCoreKeys) === null || _c === void 0 ? void 0 : _c.map(function (e) { return e; })) || []; - message.dataCoreKeys = ((_d = object.dataCoreKeys) === null || _d === void 0 ? void 0 : _d.map(function (e) { return e; })) || []; - message.blobIndexCoreKeys = ((_e = object.blobIndexCoreKeys) === null || _e === void 0 ? void 0 : _e.map(function (e) { return e; })) || []; - message.blobCoreKeys = ((_f = object.blobCoreKeys) === null || _f === void 0 ? void 0 : _f.map(function (e) { return e; })) || []; + message.authCoreKeys = ((_a = object.authCoreKeys) === null || _a === void 0 ? void 0 : _a.map(function (e) { return e; })) || []; return message; }, }; diff --git a/src/generated/extensions.ts b/src/generated/extensions.ts index 6c3d45a00..eddb7efeb 100644 --- a/src/generated/extensions.ts +++ b/src/generated/extensions.ts @@ -3,12 +3,7 @@ import Long from "long"; import _m0 from "protobufjs/minimal.js"; export interface ProjectExtension { - wantCoreKeys: Buffer[]; authCoreKeys: Buffer[]; - configCoreKeys: Buffer[]; - dataCoreKeys: Buffer[]; - blobIndexCoreKeys: Buffer[]; - blobCoreKeys: Buffer[]; } export interface HaveExtension { @@ -72,35 +67,13 @@ export function haveExtension_NamespaceToNumber(object: HaveExtension_Namespace) } function createBaseProjectExtension(): ProjectExtension { - return { - wantCoreKeys: [], - authCoreKeys: [], - configCoreKeys: [], - dataCoreKeys: [], - blobIndexCoreKeys: [], - blobCoreKeys: [], - }; + return { authCoreKeys: [] }; } export const ProjectExtension = { encode(message: ProjectExtension, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { - for (const v of message.wantCoreKeys) { - writer.uint32(10).bytes(v!); - } for (const v of message.authCoreKeys) { - writer.uint32(18).bytes(v!); - } - for (const v of message.configCoreKeys) { - writer.uint32(26).bytes(v!); - } - for (const v of message.dataCoreKeys) { - writer.uint32(34).bytes(v!); - } - for (const v of message.blobIndexCoreKeys) { - writer.uint32(42).bytes(v!); - } - for (const v of message.blobCoreKeys) { - writer.uint32(50).bytes(v!); + writer.uint32(10).bytes(v!); } return writer; }, @@ -117,43 +90,8 @@ export const ProjectExtension = { break; } - message.wantCoreKeys.push(reader.bytes() as Buffer); - continue; - case 2: - if (tag !== 18) { - break; - } - message.authCoreKeys.push(reader.bytes() as Buffer); continue; - case 3: - if (tag !== 26) { - break; - } - - message.configCoreKeys.push(reader.bytes() as Buffer); - continue; - case 4: - if (tag !== 34) { - break; - } - - message.dataCoreKeys.push(reader.bytes() as Buffer); - continue; - case 5: - if (tag !== 42) { - break; - } - - message.blobIndexCoreKeys.push(reader.bytes() as Buffer); - continue; - case 6: - if (tag !== 50) { - break; - } - - message.blobCoreKeys.push(reader.bytes() as Buffer); - continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -168,12 +106,7 @@ export const ProjectExtension = { }, fromPartial, I>>(object: I): ProjectExtension { const message = createBaseProjectExtension(); - message.wantCoreKeys = object.wantCoreKeys?.map((e) => e) || []; message.authCoreKeys = object.authCoreKeys?.map((e) => e) || []; - message.configCoreKeys = object.configCoreKeys?.map((e) => e) || []; - message.dataCoreKeys = object.dataCoreKeys?.map((e) => e) || []; - message.blobIndexCoreKeys = object.blobIndexCoreKeys?.map((e) => e) || []; - message.blobCoreKeys = object.blobCoreKeys?.map((e) => e) || []; return message; }, }; diff --git a/src/generated/keys.d.ts b/src/generated/keys.d.ts index bcb402a82..de35d580c 100644 --- a/src/generated/keys.d.ts +++ b/src/generated/keys.d.ts @@ -1,4 +1,3 @@ -/// import _m0 from "protobufjs/minimal.js"; export interface EncryptionKeys { auth: Buffer; @@ -15,88 +14,23 @@ export interface ProjectKeys { export declare const EncryptionKeys: { encode(message: EncryptionKeys, writer?: _m0.Writer): _m0.Writer; decode(input: _m0.Reader | Uint8Array, length?: number): EncryptionKeys; - create]: never; }>(base?: I): EncryptionKeys; - fromPartial]: never; }>(object: I_1): EncryptionKeys; + create, I>>(base?: I): EncryptionKeys; + fromPartial, I>>(object: I): EncryptionKeys; }; export declare const ProjectKeys: { encode(message: ProjectKeys, writer?: _m0.Writer): _m0.Writer; decode(input: _m0.Reader | Uint8Array, length?: number): ProjectKeys; - create]: never; }; - } & { [K_1 in Exclude]: never; }>(base?: I): ProjectKeys; - fromPartial]: never; }; - } & { [K_3 in Exclude]: never; }>(object: I_1): ProjectKeys; + create, I>>(base?: I): ProjectKeys; + fromPartial, I>>(object: I): ProjectKeys; }; +type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined; +type DeepPartial = T extends Builtin ? T : T extends Array ? Array> : T extends ReadonlyArray ? ReadonlyArray> : T extends {} ? { + [K in keyof T]?: DeepPartial; +} : Partial; +type KeysOfUnion = T extends T ? keyof T : never; +type Exact = P extends Builtin ? P : P & { + [K in keyof P]: Exact; +} & { + [K in Exclude>]: never; +}; +export {}; diff --git a/src/generated/rpc.d.ts b/src/generated/rpc.d.ts index e4e23ed3d..f1f4dd1bf 100644 --- a/src/generated/rpc.d.ts +++ b/src/generated/rpc.d.ts @@ -1,4 +1,3 @@ -/// import _m0 from "protobufjs/minimal.js"; import { EncryptionKeys } from "./keys.js"; export interface Invite { @@ -48,142 +47,41 @@ export declare function deviceInfo_DeviceTypeToNumber(object: DeviceInfo_DeviceT export declare const Invite: { encode(message: Invite, writer?: _m0.Writer): _m0.Writer; decode(input: _m0.Reader | Uint8Array, length?: number): Invite; - create]: never; }>(base?: I): Invite; - fromPartial]: never; }>(object: I_1): Invite; + create, I>>(base?: I): Invite; + fromPartial, I>>(object: I): Invite; }; export declare const InviteCancel: { encode(message: InviteCancel, writer?: _m0.Writer): _m0.Writer; decode(input: _m0.Reader | Uint8Array, length?: number): InviteCancel; - create]: never; }>(base?: I): InviteCancel; - fromPartial]: never; }>(object: I_1): InviteCancel; + create, I>>(base?: I): InviteCancel; + fromPartial, I>>(object: I): InviteCancel; }; export declare const InviteResponse: { encode(message: InviteResponse, writer?: _m0.Writer): _m0.Writer; decode(input: _m0.Reader | Uint8Array, length?: number): InviteResponse; - create]: never; }>(base?: I): InviteResponse; - fromPartial]: never; }>(object: I_1): InviteResponse; + create, I>>(base?: I): InviteResponse; + fromPartial, I>>(object: I): InviteResponse; }; export declare const ProjectJoinDetails: { encode(message: ProjectJoinDetails, writer?: _m0.Writer): _m0.Writer; decode(input: _m0.Reader | Uint8Array, length?: number): ProjectJoinDetails; - create]: never; }; - } & { [K_1 in Exclude]: never; }>(base?: I): ProjectJoinDetails; - fromPartial]: never; }; - } & { [K_3 in Exclude]: never; }>(object: I_1): ProjectJoinDetails; + create, I>>(base?: I): ProjectJoinDetails; + fromPartial, I>>(object: I): ProjectJoinDetails; }; export declare const DeviceInfo: { encode(message: DeviceInfo, writer?: _m0.Writer): _m0.Writer; decode(input: _m0.Reader | Uint8Array, length?: number): DeviceInfo; - create]: never; }>(base?: I): DeviceInfo; - fromPartial]: never; }>(object: I_1): DeviceInfo; + create, I>>(base?: I): DeviceInfo; + fromPartial, I>>(object: I): DeviceInfo; }; +type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined; +type DeepPartial = T extends Builtin ? T : T extends Array ? Array> : T extends ReadonlyArray ? ReadonlyArray> : T extends {} ? { + [K in keyof T]?: DeepPartial; +} : Partial; +type KeysOfUnion = T extends T ? keyof T : never; +type Exact = P extends Builtin ? P : P & { + [K in keyof P]: Exact; +} & { + [K in Exclude>]: never; +}; +export {}; diff --git a/src/mapeo-project.js b/src/mapeo-project.js index 7e40d5bbc..d19e5d092 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -338,6 +338,7 @@ export class MapeoProject extends TypedEmitter { this.#syncApi = new SyncApi({ coreManager: this.#coreManager, + coreOwnership: this.#coreOwnership, roles: this.#roles, logger: this.#l, }) diff --git a/src/sync/peer-sync-controller.js b/src/sync/peer-sync-controller.js index ca10d72dd..1d9cbc6d0 100644 --- a/src/sync/peer-sync-controller.js +++ b/src/sync/peer-sync-controller.js @@ -105,13 +105,7 @@ export class PeerSyncController { if (this.#enabledNamespaces.has(coreRecord.namespace)) { this.#replicateCore(coreRecord.core) } - return - } - if (!this.peerKey) { - this.#log('Unexpected null peerKey') - return } - this.#coreManager.requestCoreKey(this.peerKey, discoveryKey) } /** diff --git a/src/sync/sync-api.js b/src/sync/sync-api.js index d08e671aa..db3570c16 100644 --- a/src/sync/sync-api.js +++ b/src/sync/sync-api.js @@ -3,7 +3,10 @@ import { SyncState } from './sync-state.js' import { PeerSyncController } from './peer-sync-controller.js' import { Logger } from '../logger.js' import { NAMESPACES, PRESYNC_NAMESPACES } from '../constants.js' -import { ExhaustivenessError, assert, keyToId } from '../utils.js' +import { ExhaustivenessError, assert, keyToId, noop } from '../utils.js' +import { NO_ROLE_ID } from '../roles.js' +/** @import { CoreOwnership as CoreOwnershipDoc } from '@mapeo/schema' */ +/** @import { CoreOwnership } from '../core-ownership.js' */ export const kHandleDiscoveryKey = Symbol('handle discovery key') export const kSyncState = Symbol('sync state') @@ -50,6 +53,7 @@ export const kRescindFullStopRequest = Symbol('foreground') */ export class SyncApi extends TypedEmitter { #coreManager + #coreOwnership #roles /** @type {Map} */ #peerSyncControllers = new Map() @@ -75,14 +79,16 @@ export class SyncApi extends TypedEmitter { * * @param {object} opts * @param {import('../core-manager/index.js').CoreManager} opts.coreManager + * @param {CoreOwnership} opts.coreOwnership * @param {import('../roles.js').Roles} opts.roles * @param {number} [opts.throttleMs] * @param {Logger} [opts.logger] */ - constructor({ coreManager, throttleMs = 200, roles, logger }) { + constructor({ coreManager, throttleMs = 200, roles, logger, coreOwnership }) { super() this.#l = Logger.create('syncApi', logger) this.#coreManager = coreManager + this.#coreOwnership = coreOwnership this.#roles = roles this[kSyncState] = new SyncState({ coreManager, @@ -96,6 +102,21 @@ export class SyncApi extends TypedEmitter { this.#coreManager.creatorCore.on('peer-add', this.#handlePeerAdd) this.#coreManager.creatorCore.on('peer-remove', this.#handlePeerRemove) + + roles.on('update', this.#handleRoleUpdate) + coreOwnership.on('update', this.#handleCoreOwnershipUpdate) + + this.#coreOwnership + .getAll() + .then((coreOwnerships) => + Promise.allSettled( + coreOwnerships.map(async (coreOwnership) => { + if (coreOwnership.docId === this.#coreManager.deviceId) return + await this.#validateRoleAndAddCoresForPeer(coreOwnership) + }) + ) + ) + .catch(noop) } /** @type {import('../local-peers.js').LocalPeersEvents['discovery-key']} */ @@ -391,6 +412,75 @@ export class SyncApi extends TypedEmitter { this.#peerIds.delete(peerId) this.#pendingDiscoveryKeys.delete(protomux) } + + /** + * @param {Set} roleDocIds + * @returns {Promise} + */ + #handleRoleUpdate = async (roleDocIds) => { + /** @type {Promise[]} */ const coreOwnershipPromises = [] + for (const roleDocId of roleDocIds) { + // Ignore docs about ourselves + if (roleDocId === this.#coreManager.deviceId) continue + coreOwnershipPromises.push(this.#coreOwnership.get(roleDocId)) + } + + const ownershipResults = await Promise.allSettled(coreOwnershipPromises) + + for (const result of ownershipResults) { + if (result.status === 'rejected') continue + await this.#validateRoleAndAddCoresForPeer(result.value) + this.#l.log('Added cores for device %S', result.value.docId) + } + } + + /** + * @param {Set} coreOwnershipDocIds + * @returns {Promise} + */ + #handleCoreOwnershipUpdate = async (coreOwnershipDocIds) => { + /** @type {Promise[]} */ const promises = [] + + for (const coreOwnershipDocId of coreOwnershipDocIds) { + // Ignore our own ownership doc - we don't need to add cores for ourselves + if (coreOwnershipDocId === this.#coreManager.deviceId) continue + + promises.push( + (async () => { + try { + const coreOwnershipDoc = await this.#coreOwnership.get( + coreOwnershipDocId + ) + await this.#validateRoleAndAddCoresForPeer(coreOwnershipDoc) + this.#l.log('Added cores for device %S', coreOwnershipDocId) + } catch (_) { + // Ignore, we'll add these when the role is added + this.#l.log('No role for device %S', coreOwnershipDocId) + } + })() + ) + } + + await Promise.all(promises) + } + + /** + * @param {CoreOwnershipDoc} coreOwnership + * @returns {Promise} + */ + async #validateRoleAndAddCoresForPeer(coreOwnership) { + const peerDeviceId = coreOwnership.docId + const role = await this.#roles.getRole(peerDeviceId) + // We only add cores for peers that have been explicitly written into the + // project. If in the future we allow syncing from blocked peers, we can + // drop the role check here, and just add cores. + if (role.roleId === NO_ROLE_ID) return + for (const ns of NAMESPACES) { + if (ns === 'auth') continue + const coreKey = Buffer.from(coreOwnership[`${ns}CoreId`], 'hex') + this.#coreManager.addCore(coreKey, ns) + } + } } /** diff --git a/test-e2e/sync.js b/test-e2e/sync.js index d8dafa2ab..c436ee739 100644 --- a/test-e2e/sync.js +++ b/test-e2e/sync.js @@ -688,20 +688,14 @@ test('no sync capabilities === no namespaces sync apart from auth', async (t) => assert.equal(blockedState.data.localState.have, 0) // no data docs synced for (const ns of NAMESPACES) { - if (ns === 'auth') { - assert.equal(invitorState[ns].coreCount, 3) - assert.equal(inviteeState[ns].coreCount, 3) - assert.equal(blockedState[ns].coreCount, 3) - } else if (PRESYNC_NAMESPACES.includes(ns)) { - assert.equal(invitorState[ns].coreCount, 3) - assert.equal(inviteeState[ns].coreCount, 3) - assert.equal(blockedState[ns].coreCount, 1) - } else { - assert.equal(invitorState[ns].coreCount, 2) - assert.equal(inviteeState[ns].coreCount, 2) - assert.equal(blockedState[ns].coreCount, 1) - } - assert.deepEqual(invitorState[ns].localState, inviteeState[ns].localState) + assert.equal(invitorState[ns].coreCount, 3, ns) + assert.equal(inviteeState[ns].coreCount, 3, ns) + assert.equal(blockedState[ns].coreCount, 3, ns) + assert.deepEqual( + invitorState[ns].localState, + inviteeState[ns].localState, + ns + ) } await disconnect1() diff --git a/tests/core-manager.js b/tests/core-manager.js index 3de592461..54e9886d7 100644 --- a/tests/core-manager.js +++ b/tests/core-manager.js @@ -516,7 +516,7 @@ test('unreplicate', async (t) => { } }) -test('deleteOthersData()', async () => { +test('deleteOthersData()', async (t) => { await temporaryDirectoryTask(async (tempPath) => { const projectKey = randomBytes(32) @@ -567,12 +567,8 @@ test('deleteOthersData()', async () => { .map((_, i) => 'block' + i) ) - /// Replicate - const n1 = new NoiseSecretStream(true) - const n2 = new NoiseSecretStream(false) - n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) - cm1[kCoreManagerReplicate](n1) - cm2[kCoreManagerReplicate](n2) + const { destroy } = replicate(cm1, cm2) + t.after(destroy) // This delay is needed in order for replication to finish properly await new Promise((res) => setTimeout(res, 200)) @@ -669,10 +665,6 @@ test('deleteOthersData()', async () => { 0, 'peer 1 `cores` table has no info about `data` core from peer 2' ) - - n1.destroy() - n2.destroy() - await Promise.all([once(n1, 'close'), once(n2, 'close')]) }) }) diff --git a/tests/helpers/core-manager.js b/tests/helpers/core-manager.js index 526c55b25..2440fc66a 100644 --- a/tests/helpers/core-manager.js +++ b/tests/helpers/core-manager.js @@ -9,7 +9,8 @@ import RAM from 'random-access-memory' import NoiseSecretStream from '@hyperswarm/secret-stream' import { drizzle } from 'drizzle-orm/better-sqlite3' import { migrate } from 'drizzle-orm/better-sqlite3/migrator' -/** @typedef {(typeof import('../../src/constants.js').NAMESPACES)[number]} Namespace */ +import { NAMESPACES } from '../../src/constants.js' +/** @import { Namespace } from '../../src/types.js' */ /** * @@ -49,6 +50,20 @@ const destroyStream = (stream) => stream.destroy() }) +/** + * @param {CoreManager} cmToAdd + * @param {CoreManager} cmToReceive + * @returns {Promise} + */ +async function addWriterCores(cmToAdd, cmToReceive) { + await cmToAdd.ready() + for (const ns of NAMESPACES) { + if (ns === 'auth') continue + const core = cmToAdd.getWriterCore(ns) + cmToReceive.addCore(core.key, ns) + } +} + /** * * @param {CoreManager} cm1 @@ -80,6 +95,9 @@ export function replicate( cm1[kCoreManagerReplicate](n1) cm2[kCoreManagerReplicate](n2) + addWriterCores(cm1, cm2) + addWriterCores(cm2, cm1) + return { async destroy() { await Promise.all([n1, n2].map(destroyStream))