From 5ae541d6b4e40b5eddeecc7e8d5e5bea596b60bf Mon Sep 17 00:00:00 2001 From: Evan Hahn Date: Wed, 20 Nov 2024 08:34:18 -0600 Subject: [PATCH] feat: handle incoming blob filters (#956) *You may wish to [review this squashed commit as separate commits][0].* When you receive a blob filter from another peer, this updates their sync states. For example, if you receive a blob filter that says "I only want photo thumbnails", that peer's "wants" bitfield will be reduced. Closes [#682] and [#905]. [0]: https://github.com/digidem/comapeo-core/pull/956/commits [#682]: https://github.com/digidem/comapeo-core/issues/682 [#905]: https://github.com/digidem/comapeo-core/issues/905 --- src/blob-store/downloader.js | 6 +- src/blob-store/entries-stream.js | 3 +- src/blob-store/hyperdrive-index.js | 122 +++++++++++++++++++++++++++++ src/blob-store/index.js | 117 +-------------------------- src/discovery/local-discovery.js | 3 +- src/fastify-plugins/maps.js | 3 +- src/lib/error.js | 24 ++++++ src/mapeo-project.js | 71 ++++++++++++++--- src/sync/core-sync-state.js | 62 +++++++++------ src/sync/namespace-sync-state.js | 22 ++++++ src/sync/sync-api.js | 34 +++++++- src/sync/sync-state.js | 18 +++++ test-e2e/sync.js | 54 +++++++++++-- test/lib/error.js | 44 ++++++++++- test/sync/core-sync-state.js | 8 +- 15 files changed, 423 insertions(+), 168 deletions(-) create mode 100644 src/blob-store/hyperdrive-index.js diff --git a/src/blob-store/downloader.js b/src/blob-store/downloader.js index 2e6a3a24e..982a499ca 100644 --- a/src/blob-store/downloader.js +++ b/src/blob-store/downloader.js @@ -2,8 +2,8 @@ import { TypedEmitter } from 'tiny-typed-emitter' import { createEntriesStream } from './entries-stream.js' import { filePathMatchesFilter } from './utils.js' -/** @import Hyperdrive from 'hyperdrive' */ /** @import { BlobFilter } from '../types.js' */ +/** @import { THyperdriveIndex } from './hyperdrive-index.js' */ /** * Like hyperdrive.download() but 'live', and for multiple drives. @@ -26,7 +26,7 @@ import { filePathMatchesFilter } from './utils.js' * @extends {TypedEmitter<{ error: (error: Error) => void }>} */ export class Downloader extends TypedEmitter { - /** @type {import('./index.js').THyperdriveIndex} */ + /** @type {THyperdriveIndex} */ #driveIndex /** @type {Set<{ done(): Promise, destroy(): void }>} */ #queuedDownloads = new Set() @@ -36,7 +36,7 @@ export class Downloader extends TypedEmitter { #shouldDownloadFile /** - * @param {import('./index.js').THyperdriveIndex} driveIndex + * @param {THyperdriveIndex} driveIndex * @param {object} [options] * @param {BlobFilter | null} [options.filter] Filter blobs of specific types and/or sizes to download */ diff --git a/src/blob-store/entries-stream.js b/src/blob-store/entries-stream.js index f25567676..deb3c6492 100644 --- a/src/blob-store/entries-stream.js +++ b/src/blob-store/entries-stream.js @@ -5,12 +5,13 @@ import { noop } from '../utils.js' /** @import Hyperdrive from 'hyperdrive' */ /** @import { BlobStoreEntriesStream } from '../types.js' */ +/** @import { THyperdriveIndex } from './hyperdrive-index.js' */ const keyEncoding = new SubEncoder('files', 'utf-8') /** * - * @param {import('./index.js').THyperdriveIndex} driveIndex + * @param {THyperdriveIndex} driveIndex * @param {object} opts * @param {boolean} [opts.live=false] * @returns {BlobStoreEntriesStream} diff --git a/src/blob-store/hyperdrive-index.js b/src/blob-store/hyperdrive-index.js new file mode 100644 index 000000000..0f239bdfd --- /dev/null +++ b/src/blob-store/hyperdrive-index.js @@ -0,0 +1,122 @@ +import b4a from 'b4a' +import { discoveryKey } from 'hypercore-crypto' +import Hyperdrive from 'hyperdrive' +import util from 'node:util' +import { TypedEmitter } from 'tiny-typed-emitter' + +/** @typedef {HyperdriveIndexImpl} THyperdriveIndex */ + +/** + * @extends {TypedEmitter<{ 'add-drive': (drive: Hyperdrive) => void }>} + */ +export class HyperdriveIndexImpl extends TypedEmitter { + /** @type {Map} */ + #hyperdrives = new Map() + #writer + #writerKey + /** @param {import('../core-manager/index.js').CoreManager} coreManager */ + constructor(coreManager) { + super() + /** @type {undefined | Hyperdrive} */ + let writer + const corestore = new PretendCorestore({ coreManager }) + const blobIndexCores = coreManager.getCores('blobIndex') + const writerCoreRecord = coreManager.getWriterCore('blobIndex') + this.#writerKey = writerCoreRecord.key + for (const { key } of blobIndexCores) { + // @ts-ignore - we know pretendCorestore is not actually a Corestore + const drive = new Hyperdrive(corestore, key) + // We use the discovery key to derive the id for a drive + this.#hyperdrives.set(getDiscoveryId(key), drive) + if (key.equals(this.#writerKey)) { + writer = drive + } + } + if (!writer) { + throw new Error('Could not find a writer for the blobIndex namespace') + } + this.#writer = writer + + coreManager.on('add-core', ({ key, namespace }) => { + if (namespace !== 'blobIndex') return + // We use the discovery key to derive the id for a drive + const driveId = getDiscoveryId(key) + if (this.#hyperdrives.has(driveId)) return + // @ts-ignore - we know pretendCorestore is not actually a Corestore + const drive = new Hyperdrive(corestore, key) + this.#hyperdrives.set(driveId, drive) + this.emit('add-drive', drive) + }) + } + get writer() { + return this.#writer + } + get writerKey() { + return this.#writerKey + } + [Symbol.iterator]() { + return this.#hyperdrives.values() + } + /** @param {string} driveId */ + get(driveId) { + return this.#hyperdrives.get(driveId) + } +} + +/** + * Implements the `get()` method as used by hyperdrive-next. It returns the + * relevant cores from the Mapeo CoreManager. + */ +class PretendCorestore { + #coreManager + /** + * @param {object} options + * @param {import('../core-manager/index.js').CoreManager} options.coreManager + */ + constructor({ coreManager }) { + this.#coreManager = coreManager + } + + /** + * @param {Buffer | { publicKey: Buffer } | { name: string }} opts + * @returns {import('hypercore')<"binary", Buffer> | undefined} + */ + get(opts) { + if (b4a.isBuffer(opts)) { + opts = { publicKey: opts } + } + if ('key' in opts) { + // @ts-ignore + opts.publicKey = opts.key + } + if ('publicKey' in opts) { + // NB! We should always add blobIndex (Hyperbee) cores to the core manager + // before we use them here. We would only reach the addCore path if the + // blob core is read from the hyperbee header (before it is added to the + // core manager) + return ( + this.#coreManager.getCoreByKey(opts.publicKey) || + this.#coreManager.addCore(opts.publicKey, 'blob').core + ) + } else if (opts.name === 'db') { + return this.#coreManager.getWriterCore('blobIndex').core + } else if (opts.name.includes('blobs')) { + return this.#coreManager.getWriterCore('blob').core + } else { + throw new Error( + 'Unsupported corestore.get() with opts ' + util.inspect(opts) + ) + } + } + + /** no-op */ + close() {} +} + +/** + * @param {Buffer} key Public key of hypercore + * @returns {string} Hex-encoded string of derived discovery key + */ +function getDiscoveryId(key) { + return discoveryKey(key).toString('hex') +} diff --git a/src/blob-store/index.js b/src/blob-store/index.js index b1f730037..d03a9e176 100644 --- a/src/blob-store/index.js +++ b/src/blob-store/index.js @@ -1,6 +1,3 @@ -import Hyperdrive from 'hyperdrive' -import b4a from 'b4a' -import util from 'node:util' import { pipeline } from 'node:stream' import { discoveryKey } from 'hypercore-crypto' import { Downloader } from './downloader.js' @@ -8,7 +5,9 @@ import { createEntriesStream } from './entries-stream.js' import { FilterEntriesStream } from './utils.js' import { noop } from '../utils.js' import { TypedEmitter } from 'tiny-typed-emitter' +import { HyperdriveIndexImpl as HyperdriveIndex } from './hyperdrive-index.js' +/** @import Hyperdrive from 'hyperdrive' */ /** @import { JsonObject } from 'type-fest' */ /** @import { Readable as NodeReadable } from 'node:stream' */ /** @import { Readable as StreamxReadable, Writable } from 'streamx' */ @@ -132,7 +131,7 @@ export class BlobStore extends TypedEmitter { * @param {object} opts * @param {boolean} [opts.live=false] Set to `true` to get a live stream of entries * @param {import('./utils.js').GenericBlobFilter | null} [opts.filter] Filter blob types and/or variants in returned entries. Filter is { [BlobType]: BlobVariants[] }. - * @returns + * @returns {BlobStoreEntriesStream} */ createEntriesReadStream({ live = false, filter } = {}) { const entriesStream = createEntriesStream(this.#driveIndex, { live }) @@ -249,66 +248,6 @@ export class BlobStore extends TypedEmitter { } } -// Don't want to export the class, but do want to export the type. -/** @typedef {HyperdriveIndex} THyperdriveIndex */ - -/** - * @extends {TypedEmitter<{ 'add-drive': (drive: Hyperdrive) => void }>} - */ -class HyperdriveIndex extends TypedEmitter { - /** @type {Map} */ - #hyperdrives = new Map() - #writer - #writerKey - /** @param {import('../core-manager/index.js').CoreManager} coreManager */ - constructor(coreManager) { - super() - /** @type {undefined | Hyperdrive} */ - let writer - const corestore = new PretendCorestore({ coreManager }) - const blobIndexCores = coreManager.getCores('blobIndex') - const writerCoreRecord = coreManager.getWriterCore('blobIndex') - this.#writerKey = writerCoreRecord.key - for (const { key } of blobIndexCores) { - // @ts-ignore - we know pretendCorestore is not actually a Corestore - const drive = new Hyperdrive(corestore, key) - // We use the discovery key to derive the id for a drive - this.#hyperdrives.set(getDiscoveryId(key), drive) - if (key.equals(this.#writerKey)) { - writer = drive - } - } - if (!writer) { - throw new Error('Could not find a writer for the blobIndex namespace') - } - this.#writer = writer - - coreManager.on('add-core', ({ key, namespace }) => { - if (namespace !== 'blobIndex') return - // We use the discovery key to derive the id for a drive - const driveId = getDiscoveryId(key) - if (this.#hyperdrives.has(driveId)) return - // @ts-ignore - we know pretendCorestore is not actually a Corestore - const drive = new Hyperdrive(corestore, key) - this.#hyperdrives.set(driveId, drive) - this.emit('add-drive', drive) - }) - } - get writer() { - return this.#writer - } - get writerKey() { - return this.#writerKey - } - [Symbol.iterator]() { - return this.#hyperdrives.values() - } - /** @param {string} driveId */ - get(driveId) { - return this.#hyperdrives.get(driveId) - } -} - /** * @template {object} T * @template {object} U @@ -334,56 +273,6 @@ function makePath({ type, variant, name }) { return `/${type}/${variant}/${name}` } -/** - * Implements the `get()` method as used by hyperdrive-next. It returns the - * relevant cores from the Mapeo CoreManager. - */ -class PretendCorestore { - #coreManager - /** - * @param {object} options - * @param {import('../core-manager/index.js').CoreManager} options.coreManager - */ - constructor({ coreManager }) { - this.#coreManager = coreManager - } - - /** - * @param {Buffer | { publicKey: Buffer } | { name: string }} opts - * @returns {import('hypercore')<"binary", Buffer> | undefined} - */ - get(opts) { - if (b4a.isBuffer(opts)) { - opts = { publicKey: opts } - } - if ('key' in opts) { - // @ts-ignore - opts.publicKey = opts.key - } - if ('publicKey' in opts) { - // NB! We should always add blobIndex (Hyperbee) cores to the core manager - // before we use them here. We would only reach the addCore path if the - // blob core is read from the hyperbee header (before it is added to the - // core manager) - return ( - this.#coreManager.getCoreByKey(opts.publicKey) || - this.#coreManager.addCore(opts.publicKey, 'blob').core - ) - } else if (opts.name === 'db') { - return this.#coreManager.getWriterCore('blobIndex').core - } else if (opts.name.includes('blobs')) { - return this.#coreManager.getWriterCore('blob').core - } else { - throw new Error( - 'Unsupported corestore.get() with opts ' + util.inspect(opts) - ) - } - } - - /** no-op */ - close() {} -} - /** * @param {Buffer} key Public key of hypercore * @returns {string} Hex-encoded string of derived discovery key diff --git a/src/discovery/local-discovery.js b/src/discovery/local-discovery.js index 5fdbf11e7..5f1574a11 100644 --- a/src/discovery/local-discovery.js +++ b/src/discovery/local-discovery.js @@ -9,6 +9,7 @@ import StartStopStateMachine from 'start-stop-state-machine' import pTimeout from 'p-timeout' import { keyToPublicId } from '@mapeo/crypto' import { Logger } from '../logger.js' +import { getErrorCode } from '../lib/error.js' /** @import { OpenedNoiseStream } from '../lib/noise-secret-stream-helpers.js' */ /** @typedef {{ publicKey: Buffer, secretKey: Buffer }} Keypair */ @@ -117,7 +118,7 @@ export class LocalDiscovery extends TypedEmitter { /** @param {Error} e */ function onSocketError(e) { - if ('code' in e && e.code === 'EPIPE') { + if (getErrorCode(e) === 'EPIPE') { socket.destroy() if (secretStream) { secretStream.destroy() diff --git a/src/fastify-plugins/maps.js b/src/fastify-plugins/maps.js index d3df66e66..04cf1dc19 100644 --- a/src/fastify-plugins/maps.js +++ b/src/fastify-plugins/maps.js @@ -5,6 +5,7 @@ import { ReaderWatch, Server as SMPServerPlugin } from 'styled-map-package' import { noop } from '../utils.js' import { NotFoundError, ENOENTError } from './utils.js' +import { getErrorCode } from '../lib/error.js' /** @import { FastifyPluginAsync } from 'fastify' */ /** @import { Stats } from 'node:fs' */ @@ -56,7 +57,7 @@ export async function plugin(fastify, opts) { try { stats = await fs.stat(customMapPath) } catch (err) { - if (err instanceof Error && 'code' in err && err.code === 'ENOENT') { + if (getErrorCode(err) === 'ENOENT') { throw new ENOENTError(customMapPath) } diff --git a/src/lib/error.js b/src/lib/error.js index 41cbe5544..61436dd22 100644 --- a/src/lib/error.js +++ b/src/lib/error.js @@ -21,6 +21,30 @@ export class ErrorWithCode extends Error { } } +/** + * If the argument is an `Error` instance, return its `code` property if it is a string. + * Otherwise, returns `undefined`. + * + * @param {unknown} maybeError + * @returns {undefined | string} + * @example + * try { + * // do something + * } catch (err) { + * console.error(getErrorCode(err)) + * } + */ +export function getErrorCode(maybeError) { + if ( + maybeError instanceof Error && + 'code' in maybeError && + typeof maybeError.code === 'string' + ) { + return maybeError.code + } + return undefined +} + /** * Get the error message from an object if possible. * Otherwise, stringify the argument. diff --git a/src/mapeo-project.js b/src/mapeo-project.js index 331d64a62..c8d703310 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -49,7 +49,10 @@ import { omit } from './lib/omit.js' import { MemberApi } from './member-api.js' import { SyncApi, + kAddBlobWantRange, + kClearBlobWantRanges, kHandleDiscoveryKey, + kSetBlobDownloadFilter, kWaitForInitialSyncWithPeer, } from './sync/sync-api.js' import { Logger } from './logger.js' @@ -57,8 +60,9 @@ import { IconApi } from './icon-api.js' import { readConfig } from './config-import.js' import TranslationApi from './translation-api.js' import { NotFoundError } from './errors.js' +import { getErrorCode, getErrorMessage } from './lib/error.js' /** @import { ProjectSettingsValue } from '@comapeo/schema' */ -/** @import { CoreStorage, KeyPair, Namespace, ReplicationStream } from './types.js' */ +/** @import { CoreStorage, BlobFilter, BlobStoreEntriesStream, KeyPair, Namespace, ReplicationStream } from './types.js' */ /** @typedef {Omit} EditableProjectSettings */ /** @typedef {ProjectSettingsValue['configMetadata']} ConfigMetadata */ @@ -154,6 +158,8 @@ export class MapeoProject extends TypedEmitter { const getReplicationStream = this[kProjectReplicate].bind(this, true) + const blobDownloadFilter = getBlobDownloadFilter(isArchiveDevice) + ///////// 1. Setup database this.#sqlite = new Database(dbPath) @@ -370,9 +376,7 @@ export class MapeoProject extends TypedEmitter { this.#blobStore = new BlobStore({ coreManager: this.#coreManager, - downloadFilter: isArchiveDevice - ? null - : NON_ARCHIVE_DEVICE_DOWNLOAD_FILTER, + downloadFilter: blobDownloadFilter, }) this.#blobStore.on('error', (err) => { @@ -408,7 +412,7 @@ export class MapeoProject extends TypedEmitter { coreManager: this.#coreManager, coreOwnership: this.#coreOwnership, roles: this.#roles, - blobDownloadFilter: null, + blobDownloadFilter, logger: this.#l, getServerWebsocketUrls: async () => { const members = await this.#memberApi.getMany() @@ -430,6 +434,48 @@ export class MapeoProject extends TypedEmitter { getReplicationStream, }) + /** @type {Map} */ + const entriesReadStreams = new Map() + + this.#coreManager.on('peer-download-intent', async (filter, peerId) => { + entriesReadStreams.get(peerId)?.destroy() + + const entriesReadStream = this.#blobStore.createEntriesReadStream({ + live: true, + filter, + }) + entriesReadStreams.set(peerId, entriesReadStream) + + entriesReadStream.once('close', () => { + if (entriesReadStreams.get(peerId) === entriesReadStream) { + entriesReadStreams.delete(peerId) + } + }) + + this.#syncApi[kClearBlobWantRanges](peerId) + + try { + for await (const entry of entriesReadStream) { + const { blockOffset, blockLength } = entry.value.blob + this.#syncApi[kAddBlobWantRange](peerId, blockOffset, blockLength) + } + } catch (err) { + if (getErrorCode(err) === 'ERR_STREAM_PREMATURE_CLOSE') return + this.#l.log( + 'Error getting blob entries stream for peer %h: %s', + peerId, + getErrorMessage(err) + ) + } + }) + + this.#coreManager.creatorCore.on('peer-remove', (peer) => { + const peerKey = peer.protomux.stream.remotePublicKey + const peerId = peerKey.toString('hex') + entriesReadStreams.get(peerId)?.destroy() + entriesReadStreams.delete(peerId) + }) + this.#translationApi = new TranslationApi({ dataType: this.#dataTypes.translation, }) @@ -740,11 +786,10 @@ export class MapeoProject extends TypedEmitter { /** @param {boolean} isArchiveDevice */ async [kSetIsArchiveDevice](isArchiveDevice) { if (this.#isArchiveDevice === isArchiveDevice) return - this.#blobStore.setDownloadFilter( - isArchiveDevice ? null : NON_ARCHIVE_DEVICE_DOWNLOAD_FILTER - ) + const blobDownloadFilter = getBlobDownloadFilter(isArchiveDevice) + this.#blobStore.setDownloadFilter(blobDownloadFilter) + this.#syncApi[kSetBlobDownloadFilter](blobDownloadFilter) this.#isArchiveDevice = isArchiveDevice - // TODO: call this.#syncApi[kSetBlobDownloadFilter]() } /** @returns {boolean} */ @@ -1010,6 +1055,14 @@ export class MapeoProject extends TypedEmitter { } } +/** + * @param {boolean} isArchiveDevice + * @returns {null | BlobFilter} + */ +function getBlobDownloadFilter(isArchiveDevice) { + return isArchiveDevice ? null : NON_ARCHIVE_DEVICE_DOWNLOAD_FILTER +} + /** * @param {import("@comapeo/schema").ProjectSettings & { forks: string[] }} projectDoc * @returns {EditableProjectSettings} diff --git a/src/sync/core-sync-state.js b/src/sync/core-sync-state.js index b96b241de..8f075708a 100644 --- a/src/sync/core-sync-state.js +++ b/src/sync/core-sync-state.js @@ -182,13 +182,23 @@ export class CoreSyncState { * blocks/ranges that are added here * * @param {PeerId} peerId - * @param {Array<{ start: number, length: number }>} ranges + * @param {number} start + * @param {number} length + * @returns {void} */ - setPeerWants(peerId, ranges) { + addWantRange(peerId, start, length) { const peerState = this.#getOrCreatePeerState(peerId) - for (const { start, length } of ranges) { - peerState.setWantRange({ start, length }) - } + peerState.addWantRange(start, length) + this.#update() + } + + /** + * @param {PeerId} peerId + * @returns {void} + */ + clearWantRanges(peerId) { + const peerState = this.#getOrCreatePeerState(peerId) + peerState.clearWantRanges() this.#update() } @@ -291,14 +301,13 @@ export class PeerState { #preHaves = new RemoteBitfield() /** @type {HypercoreRemoteBitfield | undefined} */ #haves - /** @type {Bitfield} */ - #wants = new RemoteBitfield() + /** + * What blocks do we want? If `null`, we want everything. + * @type {null | Bitfield} + */ + #wants = null /** @type {PeerNamespaceState['status']} */ status = 'stopped' - #wantAll - constructor({ wantAll = true } = {}) { - this.#wantAll = wantAll - } get preHavesBitfield() { return this.#preHaves } @@ -316,18 +325,27 @@ export class PeerState { this.#haves = bitfield } /** - * Set a range of blocks that a peer wants. This is not part of the Hypercore + * Add a range of blocks that a peer wants. This is not part of the Hypercore * protocol, so we need our own extension messages that a peer can use to * inform us which blocks they are interested in. For most cores peers always - * want all blocks, but for blob cores often peers only want preview or + * want all blocks, but for blob cores peers may only want preview or * thumbnail versions of media * - * @param {{ start: number, length: number }} range + * @param {number} start + * @param {number} length + * @returns {void} */ - setWantRange({ start, length }) { - this.#wantAll = false + addWantRange(start, length) { + this.#wants ??= new RemoteBitfield() this.#wants.setRange(start, length, true) } + /** + * Set the range of blocks that this peer wants to the empty set. In other + * words, this peer wants nothing from this core. + */ + clearWantRanges() { + this.#wants = new RemoteBitfield() + } /** * Returns whether the peer has the block at `index`. If a pre-have bitfield * has been passed, this is used if no connected peer bitfield is available. @@ -355,8 +373,7 @@ export class PeerState { * @param {number} index */ want(index) { - if (this.#wantAll) return true - return this.#wants.get(index) + return this.#wants ? this.#wants.get(index) : true } /** * Return the "wants" for the 32 blocks from `index`, as a 32-bit integer @@ -366,11 +383,10 @@ export class PeerState { * the 32 blocks from `index` */ wantWord(index) { - if (this.#wantAll) { - // This is a 32-bit number with all bits set - return 2 ** 32 - 1 - } - return getBitfieldWord(this.#wants, index) + return this.#wants + ? getBitfieldWord(this.#wants, index) + : // This is a 32-bit number with all bits set + 2 ** 32 - 1 } } diff --git a/src/sync/namespace-sync-state.js b/src/sync/namespace-sync-state.js index 699f90094..3080c9d10 100644 --- a/src/sync/namespace-sync-state.js +++ b/src/sync/namespace-sync-state.js @@ -136,6 +136,28 @@ export class NamespaceSyncState { this.#getCoreState(coreDiscoveryId).insertPreHaves(peerId, start, bitfield) } + /** + * @param {string} peerId + * @param {number} start + * @param {number} length + * @returns {void} + */ + addWantRange(peerId, start, length) { + for (const coreState of this.#coreStates.values()) { + coreState.addWantRange(peerId, start, length) + } + } + + /** + * @param {string} peerId + * @returns {void} + */ + clearWantRanges(peerId) { + for (const coreState of this.#coreStates.values()) { + coreState.clearWantRanges(peerId) + } + } + /** * @param {string} discoveryId */ diff --git a/src/sync/sync-api.js b/src/sync/sync-api.js index ea5ed77c4..5ed6e1674 100644 --- a/src/sync/sync-api.js +++ b/src/sync/sync-api.js @@ -16,7 +16,7 @@ import { NO_ROLE_ID } from '../roles.js' /** @import * as http from 'node:http' */ /** @import { CoreOwnership } from '../core-ownership.js' */ /** @import { OpenedNoiseStream } from '../lib/noise-secret-stream-helpers.js' */ -/** @import { ReplicationStream } from '../types.js' */ +/** @import { BlobFilter, ReplicationStream } from '../types.js' */ export const kHandleDiscoveryKey = Symbol('handle discovery key') export const kSyncState = Symbol('sync state') @@ -26,6 +26,8 @@ export const kWaitForInitialSyncWithPeer = Symbol( 'wait for initial sync with peer' ) export const kSetBlobDownloadFilter = Symbol('set isArchiveDevice') +export const kAddBlobWantRange = Symbol('add blob want range') +export const kClearBlobWantRanges = Symbol('clear blob want ranges') /** * @typedef {'initial' | 'full'} SyncType @@ -91,7 +93,8 @@ export class SyncApi extends TypedEmitter { #getReplicationStream /** @type {Map} */ #serverWebsockets = new Map() - #blobDownloadFilter + /** @type {null | BlobFilter} */ + #blobDownloadFilter = null /** * @param {object} opts @@ -100,7 +103,7 @@ export class SyncApi extends TypedEmitter { * @param {import('../roles.js').Roles} opts.roles * @param {() => Promise>} opts.getServerWebsocketUrls * @param {() => ReplicationStream} opts.getReplicationStream - * @param {import('../types.js').BlobFilter | null} opts.blobDownloadFilter + * @param {null | BlobFilter} opts.blobDownloadFilter * @param {number} [opts.throttleMs] * @param {Logger} [opts.logger] */ @@ -116,7 +119,6 @@ export class SyncApi extends TypedEmitter { }) { super() this.#l = Logger.create('syncApi', logger) - this.#blobDownloadFilter = blobDownloadFilter this.#coreManager = coreManager this.#coreOwnership = coreOwnership this.#roles = roles @@ -133,6 +135,8 @@ export class SyncApi extends TypedEmitter { this.#updateState(namespaceSyncState) }) + this[kSetBlobDownloadFilter](blobDownloadFilter) + this.#coreManager.creatorCore.on('peer-add', this.#handlePeerAdd) this.#coreManager.creatorCore.on('peer-remove', this.#handlePeerDisconnect) @@ -161,6 +165,28 @@ export class SyncApi extends TypedEmitter { } } + /** + * Add some blob blocks this peer wants. + * + * @param {string} peerId + * @param {number} start + * @param {number} length + * @returns {void} + */ + [kAddBlobWantRange](peerId, start, length) { + this[kSyncState].addBlobWantRange(peerId, start, length) + } + + /** + * Clear the blob blocks this peer wants. + * + * @param {string} peerId + * @returns {void} + */ + [kClearBlobWantRanges](peerId) { + this[kSyncState].clearBlobWantRanges(peerId) + } + /** @type {import('../local-peers.js').LocalPeersEvents['discovery-key']} */ [kHandleDiscoveryKey](discoveryKey, protomux) { const peerSyncController = this.#peerSyncControllers.get(protomux) diff --git a/src/sync/sync-state.js b/src/sync/sync-state.js index 7e64836a0..d24ecfd42 100644 --- a/src/sync/sync-state.js +++ b/src/sync/sync-state.js @@ -68,6 +68,24 @@ export class SyncState extends TypedEmitter { ]) } + /** + * @param {string} peerId + * @param {number} start + * @param {number} length + * @returns {void} + */ + addBlobWantRange(peerId, start, length) { + this.#syncStates.blob.addWantRange(peerId, start, length) + } + + /** + * @param {string} peerId + * @returns {void} + */ + clearBlobWantRanges(peerId) { + this.#syncStates.blob.clearWantRanges(peerId) + } + #handleUpdate = () => { this.emit('state', this.getState()) } diff --git a/test-e2e/sync.js b/test-e2e/sync.js index 81c6cbb60..56a794608 100644 --- a/test-e2e/sync.js +++ b/test-e2e/sync.js @@ -187,6 +187,7 @@ test('non-archive devices only sync a subset of blobs', async (t) => { const fastifyController = new FastifyController({ fastify }) t.after(() => fastifyController.stop()) const invitee = createManager('invitee', t, { fastify }) + invitee.setIsArchiveDevice(false) const managers = [invitee, invitor] @@ -209,6 +210,9 @@ test('non-archive devices only sync a subset of blobs', async (t) => { const [invitorProject, inviteeProject] = projects const fixturesPath = new URL('../test/fixtures/', import.meta.url) + + // Test that only previews and thumbnails sync to non-archive devices + const imagesFixturesPath = new URL('images/', fixturesPath) const photoFixturePaths = { original: new URL('02-digidem-logo.jpg', imagesFixturesPath).pathname, @@ -233,12 +237,10 @@ test('non-archive devices only sync a subset of blobs', async (t) => { invitorProject.$sync.start() inviteeProject.$sync.start() - // TODO: We should replace this with `await waitForSync(projects, 'full')` once - // the following issues are merged: - // - // - - // - - await delay(2000) + await waitForSync(projects, 'full') + + inviteeProject.$sync.stop() + inviteeProject.$sync.stop() /** * @param {BlobId} blobId @@ -278,6 +280,46 @@ test('non-archive devices only sync a subset of blobs', async (t) => { photoFixturePaths.thumbnail ), ]) + + // Devices can become archives again and get all the data + + invitee.setIsArchiveDevice(true) + + invitorProject.$sync.start() + inviteeProject.$sync.start() + + await waitForSync(projects, 'full') + + await Promise.all([ + assertLoads( + { ...photoBlob, variant: 'original' }, + photoFixturePaths.original + ), + assertLoads({ ...audioBlob, variant: 'original' }, audioFixturePath), + ]) + + // Devices can toggle whether they're an archive device while sync is running + + invitee.setIsArchiveDevice(false) + + const photoBlob2 = await invitorProject.$blobs.create( + photoFixturePaths, + blobMetadata({ mimeType: 'image/jpeg' }) + ) + + await waitForSync(projects, 'full') + + await Promise.all([ + assert404({ ...photoBlob2, variant: 'original' }), + assertLoads( + { ...photoBlob2, type: 'photo', variant: 'preview' }, + photoFixturePaths.preview + ), + assertLoads( + { ...photoBlob2, type: 'photo', variant: 'thumbnail' }, + photoFixturePaths.thumbnail + ), + ]) }) test('start and stop sync', async function (t) { diff --git a/test/lib/error.js b/test/lib/error.js index c7de6ea98..56f9ccc6c 100644 --- a/test/lib/error.js +++ b/test/lib/error.js @@ -1,6 +1,10 @@ import assert from 'node:assert/strict' import test, { describe } from 'node:test' -import { ErrorWithCode, getErrorMessage } from '../../src/lib/error.js' +import { + ErrorWithCode, + getErrorCode, + getErrorMessage, +} from '../../src/lib/error.js' describe('ErrorWithCode', () => { test('ErrorWithCode with two arguments', () => { @@ -22,6 +26,44 @@ describe('ErrorWithCode', () => { }) }) +describe('getErrorCode', () => { + test('from values without a string code', () => { + class ErrorWithNumericCode extends Error { + code = 123 + } + + const testCases = [ + undefined, + null, + 'ignored', + { code: 'ignored' }, + new Error('has no code'), + new ErrorWithNumericCode(), + ] + + for (const testCase of testCases) { + assert.equal(getErrorCode(testCase), undefined) + } + }) + + test('from Errors with a string code', () => { + class ErrorWithInheritedCode extends Error { + get code() { + return 'foo' + } + } + + const testCases = [ + new ErrorWithCode('foo', 'message'), + new ErrorWithInheritedCode(), + ] + + for (const testCase of testCases) { + assert.equal(getErrorCode(testCase), 'foo') + } + }) +}) + describe('getErrorMessage', () => { test('from objects without a string message', () => { const testCases = [ diff --git a/test/sync/core-sync-state.js b/test/sync/core-sync-state.js index 3f3a45de0..4a7b13d3e 100644 --- a/test/sync/core-sync-state.js +++ b/test/sync/core-sync-state.js @@ -408,7 +408,7 @@ function createState({ have, prehave, want, status }) { // 53 because the max safe integer in JS is 53 bits for (let i = 0; i < 53; i++) { if ((bigInt >> BigInt(i)) & BigInt(1)) { - peerState.setWantRange({ start: i, length: 1 }) + peerState.addWantRange(i, 1) } } } @@ -480,16 +480,14 @@ async function downloadCore(core, bits) { function setPeerWants(state, peerId, bits) { if (typeof bits === 'undefined') return if (bits > Number.MAX_SAFE_INTEGER) throw new Error() + state.clearWantRanges(peerId) const bigInt = BigInt(bits) - /** @type {{ start: number, length: number}[]} */ - const ranges = [] // 53 because the max safe integer in JS is 53 bits for (let i = 0; i < 53; i++) { if ((bigInt >> BigInt(i)) & BigInt(1)) { - ranges.push({ start: i, length: 1 }) + state.addWantRange(peerId, i, 1) } } - state.setPeerWants(peerId, ranges) } /**