diff --git a/proto/extensions.proto b/proto/extensions.proto index 7bab5f61e..02e84eb49 100644 --- a/proto/extensions.proto +++ b/proto/extensions.proto @@ -19,3 +19,11 @@ message HaveExtension { bytes encodedBitfield = 3; Namespace namespace = 4; } + +// A map of blob types and variants that a peer intends to download +message DownloadIntentExtension { + message DownloadIntent { + repeated string variants = 1; + } + map downloadIntents = 1; +} diff --git a/src/core-manager/index.js b/src/core-manager/index.js index 9a20341c1..c119cdee8 100644 --- a/src/core-manager/index.js +++ b/src/core-manager/index.js @@ -4,16 +4,21 @@ import { debounce } from 'throttle-debounce' import assert from 'node:assert/strict' import { sql, eq } from 'drizzle-orm' -import { HaveExtension, ProjectExtension } from '../generated/extensions.js' +import { + HaveExtension, + ProjectExtension, + DownloadIntentExtension, +} from '../generated/extensions.js' import { Logger } from '../logger.js' import { NAMESPACES } from '../constants.js' import { noop } from '../utils.js' import { coresTable } from '../schema/project.js' import * as rle from './bitfield-rle.js' import { CoreIndex } from './core-index.js' +import mapObject from 'map-obj' /** @import Hypercore from 'hypercore' */ -/** @import { HypercorePeer, Namespace } from '../types.js' */ +/** @import { BlobFilter, GenericBlobFilter, HypercorePeer, Namespace } from '../types.js' */ const WRITER_CORE_PREHAVES_DEBOUNCE_DELAY = 1000 @@ -25,6 +30,7 @@ export const kCoreManagerReplicate = Symbol('replicate core manager') * @typedef {Object} Events * @property {(coreRecord: CoreRecord) => void} add-core * @property {(namespace: Namespace, msg: { coreDiscoveryId: string, peerId: string, start: number, bitfield: Uint32Array }) => void} peer-have + * @property {(blobFilter: GenericBlobFilter, peerId: string) => void} peer-download-intent */ /** @@ -46,6 +52,7 @@ export class CoreManager extends TypedEmitter { #deviceId #l #autoDownload + #downloadIntentExtension static get namespaces() { return NAMESPACES @@ -158,6 +165,16 @@ export class CoreManager extends TypedEmitter { }, }) + this.#downloadIntentExtension = this.creatorCore.registerExtension( + 'mapeo/download-intent', + { + encoding: DownloadIntentCodec, + onmessage: (msg, peer) => { + this.#handleDownloadIntentMessage(msg, peer) + }, + } + ) + this.creatorCore.on('peer-add', (peer) => { this.#sendHaves(peer, this.#coreIndex).catch(() => { this.#l.log('Failed to send pre-haves to newly-connected peer') @@ -396,6 +413,23 @@ export class CoreManager extends TypedEmitter { }) } + /** + * @param {GenericBlobFilter} blobFilter + * @param {HypercorePeer} peer + */ + #handleDownloadIntentMessage(blobFilter, peer) { + const peerId = peer.remotePublicKey.toString('hex') + this.emit('peer-download-intent', blobFilter, peerId) + } + + /** + * @param {BlobFilter} blobFilter + * @param {HypercorePeer} peer + */ + sendDownloadIntents(blobFilter, peer) { + this.#downloadIntentExtension.send(blobFilter, peer) + } + /** * * @param {HypercorePeer} peer @@ -506,3 +540,25 @@ const HaveExtensionCodec = { } }, } + +const DownloadIntentCodec = { + /** @param {BlobFilter} filter */ + encode(filter) { + const downloadIntents = mapObject(filter, (key, value) => [ + key, + { variants: value || [] }, + ]) + return DownloadIntentExtension.encode({ downloadIntents }).finish() + }, + /** + * @param {Buffer | Uint8Array} buf + * @returns {GenericBlobFilter} + */ + decode(buf) { + const msg = DownloadIntentExtension.decode(buf) + return mapObject(msg.downloadIntents, (key, value) => [ + key + '', // keep TS happy + value.variants, + ]) + }, +} diff --git a/src/datastore/README.md b/src/datastore/README.md index 8a2182b75..cc1f2a841 100644 --- a/src/datastore/README.md +++ b/src/datastore/README.md @@ -33,8 +33,6 @@ datastore.on('index-state', ({ current, remaining, entriesPerSecond }) => { // show state to user that indexing is happening } }) - -const { current, remaining, entriesPerSecond } = datastore.getIndexState() ``` ## API docs diff --git a/src/datastore/index.js b/src/datastore/index.js index 43712654f..990146122 100644 --- a/src/datastore/index.js +++ b/src/datastore/index.js @@ -91,10 +91,6 @@ export class DataStore extends TypedEmitter { return this.#writerCore } - getIndexState() { - return this.#coreIndexer.state - } - /** * * @param {MultiCoreIndexer.Entry<'binary'>[]} entries diff --git a/src/generated/extensions.d.ts b/src/generated/extensions.d.ts index f8841e2ad..30d6d9317 100644 --- a/src/generated/extensions.d.ts +++ b/src/generated/extensions.d.ts @@ -19,6 +19,19 @@ export declare const HaveExtension_Namespace: { export type HaveExtension_Namespace = typeof HaveExtension_Namespace[keyof typeof HaveExtension_Namespace]; export declare function haveExtension_NamespaceFromJSON(object: any): HaveExtension_Namespace; export declare function haveExtension_NamespaceToNumber(object: HaveExtension_Namespace): number; +/** A map of blob types and variants that a peer intends to download */ +export interface DownloadIntentExtension { + downloadIntents: { + [key: string]: DownloadIntentExtension_DownloadIntent; + }; +} +export interface DownloadIntentExtension_DownloadIntent { + variants: string[]; +} +export interface DownloadIntentExtension_DownloadIntentsEntry { + key: string; + value: DownloadIntentExtension_DownloadIntent | undefined; +} export declare const ProjectExtension: { encode(message: ProjectExtension, writer?: _m0.Writer): _m0.Writer; decode(input: _m0.Reader | Uint8Array, length?: number): ProjectExtension; @@ -31,6 +44,24 @@ export declare const HaveExtension: { create, I>>(base?: I): HaveExtension; fromPartial, I>>(object: I): HaveExtension; }; +export declare const DownloadIntentExtension: { + encode(message: DownloadIntentExtension, writer?: _m0.Writer): _m0.Writer; + decode(input: _m0.Reader | Uint8Array, length?: number): DownloadIntentExtension; + create, I>>(base?: I): DownloadIntentExtension; + fromPartial, I>>(object: I): DownloadIntentExtension; +}; +export declare const DownloadIntentExtension_DownloadIntent: { + encode(message: DownloadIntentExtension_DownloadIntent, writer?: _m0.Writer): _m0.Writer; + decode(input: _m0.Reader | Uint8Array, length?: number): DownloadIntentExtension_DownloadIntent; + create, I>>(base?: I): DownloadIntentExtension_DownloadIntent; + fromPartial, I>>(object: I): DownloadIntentExtension_DownloadIntent; +}; +export declare const DownloadIntentExtension_DownloadIntentsEntry: { + encode(message: DownloadIntentExtension_DownloadIntentsEntry, writer?: _m0.Writer): _m0.Writer; + decode(input: _m0.Reader | Uint8Array, length?: number): DownloadIntentExtension_DownloadIntentsEntry; + create, I>>(base?: I): DownloadIntentExtension_DownloadIntentsEntry; + fromPartial, I>>(object: I): DownloadIntentExtension_DownloadIntentsEntry; +}; type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined; type DeepPartial = T extends Builtin ? T : T extends Array ? Array> : T extends ReadonlyArray ? ReadonlyArray> : T extends {} ? { [K in keyof T]?: DeepPartial; diff --git a/src/generated/extensions.js b/src/generated/extensions.js index 088e1cc22..f601f7e0a 100644 --- a/src/generated/extensions.js +++ b/src/generated/extensions.js @@ -169,6 +169,156 @@ export var HaveExtension = { return message; }, }; +function createBaseDownloadIntentExtension() { + return { downloadIntents: {} }; +} +export var DownloadIntentExtension = { + encode: function (message, writer) { + if (writer === void 0) { writer = _m0.Writer.create(); } + Object.entries(message.downloadIntents).forEach(function (_a) { + var key = _a[0], value = _a[1]; + DownloadIntentExtension_DownloadIntentsEntry.encode({ key: key, value: value }, writer.uint32(10).fork()) + .ldelim(); + }); + return writer; + }, + decode: function (input, length) { + var reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + var end = length === undefined ? reader.len : reader.pos + length; + var message = createBaseDownloadIntentExtension(); + while (reader.pos < end) { + var tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + var entry1 = DownloadIntentExtension_DownloadIntentsEntry.decode(reader, reader.uint32()); + if (entry1.value !== undefined) { + message.downloadIntents[entry1.key] = entry1.value; + } + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + create: function (base) { + return DownloadIntentExtension.fromPartial(base !== null && base !== void 0 ? base : {}); + }, + fromPartial: function (object) { + var _a; + var message = createBaseDownloadIntentExtension(); + message.downloadIntents = Object.entries((_a = object.downloadIntents) !== null && _a !== void 0 ? _a : {}).reduce(function (acc, _a) { + var key = _a[0], value = _a[1]; + if (value !== undefined) { + acc[key] = DownloadIntentExtension_DownloadIntent.fromPartial(value); + } + return acc; + }, {}); + return message; + }, +}; +function createBaseDownloadIntentExtension_DownloadIntent() { + return { variants: [] }; +} +export var DownloadIntentExtension_DownloadIntent = { + encode: function (message, writer) { + if (writer === void 0) { writer = _m0.Writer.create(); } + for (var _i = 0, _a = message.variants; _i < _a.length; _i++) { + var v = _a[_i]; + writer.uint32(10).string(v); + } + return writer; + }, + decode: function (input, length) { + var reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + var end = length === undefined ? reader.len : reader.pos + length; + var message = createBaseDownloadIntentExtension_DownloadIntent(); + while (reader.pos < end) { + var tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + message.variants.push(reader.string()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + create: function (base) { + return DownloadIntentExtension_DownloadIntent.fromPartial(base !== null && base !== void 0 ? base : {}); + }, + fromPartial: function (object) { + var _a; + var message = createBaseDownloadIntentExtension_DownloadIntent(); + message.variants = ((_a = object.variants) === null || _a === void 0 ? void 0 : _a.map(function (e) { return e; })) || []; + return message; + }, +}; +function createBaseDownloadIntentExtension_DownloadIntentsEntry() { + return { key: "", value: undefined }; +} +export var DownloadIntentExtension_DownloadIntentsEntry = { + encode: function (message, writer) { + if (writer === void 0) { writer = _m0.Writer.create(); } + if (message.key !== "") { + writer.uint32(10).string(message.key); + } + if (message.value !== undefined) { + DownloadIntentExtension_DownloadIntent.encode(message.value, writer.uint32(18).fork()).ldelim(); + } + return writer; + }, + decode: function (input, length) { + var reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + var end = length === undefined ? reader.len : reader.pos + length; + var message = createBaseDownloadIntentExtension_DownloadIntentsEntry(); + while (reader.pos < end) { + var tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + message.key = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + message.value = DownloadIntentExtension_DownloadIntent.decode(reader, reader.uint32()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + create: function (base) { + return DownloadIntentExtension_DownloadIntentsEntry.fromPartial(base !== null && base !== void 0 ? base : {}); + }, + fromPartial: function (object) { + var _a; + var message = createBaseDownloadIntentExtension_DownloadIntentsEntry(); + message.key = (_a = object.key) !== null && _a !== void 0 ? _a : ""; + message.value = (object.value !== undefined && object.value !== null) + ? DownloadIntentExtension_DownloadIntent.fromPartial(object.value) + : undefined; + return message; + }, +}; var tsProtoGlobalThis = (function () { if (typeof globalThis !== "undefined") { return globalThis; diff --git a/src/generated/extensions.ts b/src/generated/extensions.ts index eddb7efeb..0098c5aae 100644 --- a/src/generated/extensions.ts +++ b/src/generated/extensions.ts @@ -66,6 +66,20 @@ export function haveExtension_NamespaceToNumber(object: HaveExtension_Namespace) } } +/** A map of blob types and variants that a peer intends to download */ +export interface DownloadIntentExtension { + downloadIntents: { [key: string]: DownloadIntentExtension_DownloadIntent }; +} + +export interface DownloadIntentExtension_DownloadIntent { + variants: string[]; +} + +export interface DownloadIntentExtension_DownloadIntentsEntry { + key: string; + value: DownloadIntentExtension_DownloadIntent | undefined; +} + function createBaseProjectExtension(): ProjectExtension { return { authCoreKeys: [] }; } @@ -194,6 +208,173 @@ export const HaveExtension = { }, }; +function createBaseDownloadIntentExtension(): DownloadIntentExtension { + return { downloadIntents: {} }; +} + +export const DownloadIntentExtension = { + encode(message: DownloadIntentExtension, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + Object.entries(message.downloadIntents).forEach(([key, value]) => { + DownloadIntentExtension_DownloadIntentsEntry.encode({ key: key as any, value }, writer.uint32(10).fork()) + .ldelim(); + }); + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): DownloadIntentExtension { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseDownloadIntentExtension(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + const entry1 = DownloadIntentExtension_DownloadIntentsEntry.decode(reader, reader.uint32()); + if (entry1.value !== undefined) { + message.downloadIntents[entry1.key] = entry1.value; + } + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + create, I>>(base?: I): DownloadIntentExtension { + return DownloadIntentExtension.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>(object: I): DownloadIntentExtension { + const message = createBaseDownloadIntentExtension(); + message.downloadIntents = Object.entries(object.downloadIntents ?? {}).reduce< + { [key: string]: DownloadIntentExtension_DownloadIntent } + >((acc, [key, value]) => { + if (value !== undefined) { + acc[key] = DownloadIntentExtension_DownloadIntent.fromPartial(value); + } + return acc; + }, {}); + return message; + }, +}; + +function createBaseDownloadIntentExtension_DownloadIntent(): DownloadIntentExtension_DownloadIntent { + return { variants: [] }; +} + +export const DownloadIntentExtension_DownloadIntent = { + encode(message: DownloadIntentExtension_DownloadIntent, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + for (const v of message.variants) { + writer.uint32(10).string(v!); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): DownloadIntentExtension_DownloadIntent { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseDownloadIntentExtension_DownloadIntent(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.variants.push(reader.string()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + create, I>>( + base?: I, + ): DownloadIntentExtension_DownloadIntent { + return DownloadIntentExtension_DownloadIntent.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): DownloadIntentExtension_DownloadIntent { + const message = createBaseDownloadIntentExtension_DownloadIntent(); + message.variants = object.variants?.map((e) => e) || []; + return message; + }, +}; + +function createBaseDownloadIntentExtension_DownloadIntentsEntry(): DownloadIntentExtension_DownloadIntentsEntry { + return { key: "", value: undefined }; +} + +export const DownloadIntentExtension_DownloadIntentsEntry = { + encode(message: DownloadIntentExtension_DownloadIntentsEntry, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.key !== "") { + writer.uint32(10).string(message.key); + } + if (message.value !== undefined) { + DownloadIntentExtension_DownloadIntent.encode(message.value, writer.uint32(18).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): DownloadIntentExtension_DownloadIntentsEntry { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseDownloadIntentExtension_DownloadIntentsEntry(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.key = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.value = DownloadIntentExtension_DownloadIntent.decode(reader, reader.uint32()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + create, I>>( + base?: I, + ): DownloadIntentExtension_DownloadIntentsEntry { + return DownloadIntentExtension_DownloadIntentsEntry.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): DownloadIntentExtension_DownloadIntentsEntry { + const message = createBaseDownloadIntentExtension_DownloadIntentsEntry(); + message.key = object.key ?? ""; + message.value = (object.value !== undefined && object.value !== null) + ? DownloadIntentExtension_DownloadIntent.fromPartial(object.value) + : undefined; + return message; + }, +}; + declare const self: any | undefined; declare const window: any | undefined; declare const global: any | undefined; diff --git a/src/mapeo-project.js b/src/mapeo-project.js index 07a7a550b..b3b59f806 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -371,6 +371,7 @@ export class MapeoProject extends TypedEmitter { coreManager: this.#coreManager, coreOwnership: this.#coreOwnership, roles: this.#roles, + blobDownloadFilter: null, logger: this.#l, }) @@ -677,6 +678,7 @@ export class MapeoProject extends TypedEmitter { isArchiveDevice ? null : NON_ARCHIVE_DEVICE_DOWNLOAD_FILTER ) this.#isArchiveDevice = isArchiveDevice + // TODO: call this.#syncApi[kSetBlobDownloadFilter]() } /** @returns {boolean} */ diff --git a/src/sync/sync-api.js b/src/sync/sync-api.js index 00a0cd29b..88e21a49e 100644 --- a/src/sync/sync-api.js +++ b/src/sync/sync-api.js @@ -17,6 +17,7 @@ export const kHandleDiscoveryKey = Symbol('handle discovery key') export const kSyncState = Symbol('sync state') export const kRequestFullStop = Symbol('background') export const kRescindFullStopRequest = Symbol('foreground') +export const kSetBlobDownloadFilter = Symbol('set isArchiveDevice') /** * @typedef {'initial' | 'full'} SyncType @@ -77,6 +78,7 @@ export class SyncApi extends TypedEmitter { /** @type {Map>} */ #pendingDiscoveryKeys = new Map() #l + #blobDownloadFilter /** * @@ -84,12 +86,21 @@ export class SyncApi extends TypedEmitter { * @param {import('../core-manager/index.js').CoreManager} opts.coreManager * @param {CoreOwnership} opts.coreOwnership * @param {import('../roles.js').Roles} opts.roles + * @param {import('../types.js').BlobFilter | null} opts.blobDownloadFilter * @param {number} [opts.throttleMs] * @param {Logger} [opts.logger] */ - constructor({ coreManager, throttleMs = 200, roles, logger, coreOwnership }) { + constructor({ + coreManager, + throttleMs = 200, + roles, + logger, + coreOwnership, + blobDownloadFilter, + }) { super() this.#l = Logger.create('syncApi', logger) + this.#blobDownloadFilter = blobDownloadFilter this.#coreManager = coreManager this.#coreOwnership = coreOwnership this.#roles = roles @@ -123,6 +134,15 @@ export class SyncApi extends TypedEmitter { .catch(noop) } + /** @param {import('../types.js').BlobFilter | null} blobDownloadFilter */ + [kSetBlobDownloadFilter](blobDownloadFilter) { + this.#blobDownloadFilter = blobDownloadFilter + if (!blobDownloadFilter) return // No download intents = intend to download everything + for (const peer of this.#coreManager.creatorCore.peers) { + this.#coreManager.sendDownloadIntents(blobDownloadFilter, peer) + } + } + /** @type {import('../local-peers.js').LocalPeersEvents['discovery-key']} */ [kHandleDiscoveryKey](discoveryKey, protomux) { const peerSyncController = this.#peerSyncControllers.get(protomux) @@ -363,7 +383,7 @@ export class SyncApi extends TypedEmitter { * will then handle validation of role records to ensure that the peer is * actually still part of the project. * - * @param {{ protomux: import('protomux') }} peer + * @param {import('../types.js').HypercorePeer & { protomux: import('protomux') }} peer */ #handlePeerAdd = (peer) => { const { protomux } = peer @@ -374,6 +394,9 @@ export class SyncApi extends TypedEmitter { ) return } + if (this.#blobDownloadFilter) { + this.#coreManager.sendDownloadIntents(this.#blobDownloadFilter, peer) + } const peerSyncController = new PeerSyncController({ protomux, coreManager: this.#coreManager, diff --git a/src/types.ts b/src/types.ts index 17db3d4a5..19be646be 100644 --- a/src/types.ts +++ b/src/types.ts @@ -43,12 +43,13 @@ export type BlobId = Simplify< }> > -type ArrayAtLeastOne = [T, ...T[]] - export type BlobFilter = RequireAtLeastOne<{ - [KeyType in BlobType]: ArrayAtLeastOne> + [KeyType in BlobType]: Array> }> +/** Map of blob types to array of blob variants */ +export type GenericBlobFilter = Record + export type MapeoDocMap = { [K in MapeoDoc['schemaName']]: Extract }