diff --git a/package-lock.json b/package-lock.json index f03f80113..7ec004e9e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18,6 +18,7 @@ "@mapeo/crypto": "1.0.0-alpha.10", "@mapeo/sqlite-indexer": "1.0.0-alpha.9", "@sinclair/typebox": "^0.29.6", + "@sindresorhus/merge-streams": "^4.0.0", "b4a": "^1.6.3", "bcp-47": "^2.1.0", "better-sqlite3": "^8.7.0", @@ -55,6 +56,7 @@ "tiny-typed-emitter": "^2.1.0", "type-fest": "^4.5.0", "undici": "^6.13.0", + "unix-path-resolve": "^1.0.2", "varint": "^6.0.0", "ws": "^8.18.0", "yauzl-promise": "^4.0.0" @@ -1183,8 +1185,6 @@ "version": "4.0.0", "resolved": "https://registry.npmjs.org/@sindresorhus/merge-streams/-/merge-streams-4.0.0.tgz", "integrity": "sha512-tlqY9xq5ukxTUZBmoOp+m61cqwQD5pHJtFY3Mn8CA8ps6yghLH/Hw8UPdqg4OLmFW3IFlcXnQNmo/dh8HzXYIQ==", - "dev": true, - "license": "MIT", "engines": { "node": ">=18" }, diff --git a/package.json b/package.json index 02b3a3976..dced69c0a 100644 --- a/package.json +++ b/package.json @@ -166,6 +166,7 @@ "@mapeo/crypto": "1.0.0-alpha.10", "@mapeo/sqlite-indexer": "1.0.0-alpha.9", "@sinclair/typebox": "^0.29.6", + "@sindresorhus/merge-streams": "^4.0.0", "b4a": "^1.6.3", "bcp-47": "^2.1.0", "better-sqlite3": "^8.7.0", @@ -203,6 +204,7 @@ "tiny-typed-emitter": "^2.1.0", "type-fest": "^4.5.0", "undici": "^6.13.0", + "unix-path-resolve": "^1.0.2", "varint": "^6.0.0", "ws": "^8.18.0", "yauzl-promise": "^4.0.0" diff --git a/src/blob-store/downloader.js b/src/blob-store/downloader.js new file mode 100644 index 000000000..982a499ca --- /dev/null +++ b/src/blob-store/downloader.js @@ -0,0 +1,130 @@ +import { TypedEmitter } from 'tiny-typed-emitter' +import { createEntriesStream } from './entries-stream.js' +import { filePathMatchesFilter } from './utils.js' + +/** @import { BlobFilter } from '../types.js' */ +/** @import { THyperdriveIndex } from './hyperdrive-index.js' */ + +/** + * Like hyperdrive.download() but 'live', and for multiple drives. + * + * Will emit an 'error' event for any unexpected errors. A consumer must attach + * an error listener to avoid uncaught errors. Sources of errors include: + * + * - If the entries stream emits an error + * - If a drive referenced in an entry is not found + * - If core.has() throws (e.g. if hypercore is closed) + * - If core.download().done() throws, which should not happen according to + * current hypercore code. + * - If the entries stream ends unexpectedly (it should be live and not end) + * + * NB: unlike hyperdrive.download(), this will also download deleted and + * previous versions of blobs - we don't currently support editing or deleting + * of blobs, so this should not be an issue, and if we do in the future, + * downloading deleted and previous versions may be desirable behavior anyway + * + * @extends {TypedEmitter<{ error: (error: Error) => void }>} + */ +export class Downloader extends TypedEmitter { + /** @type {THyperdriveIndex} */ + #driveIndex + /** @type {Set<{ done(): Promise, destroy(): void }>} */ + #queuedDownloads = new Set() + #entriesStream + #processEntriesPromise + #ac = new AbortController() + #shouldDownloadFile + + /** + * @param {THyperdriveIndex} driveIndex + * @param {object} [options] + * @param {BlobFilter | null} [options.filter] Filter blobs of specific types and/or sizes to download + */ + constructor(driveIndex, { filter } = {}) { + super() + this.#driveIndex = driveIndex + + this.#shouldDownloadFile = filter + ? filePathMatchesFilter.bind(null, filter) + : () => true + + this.#entriesStream = createEntriesStream(driveIndex, { live: true }) + this.#entriesStream.once('error', this.#handleError) + + this.#ac.signal.addEventListener('abort', this.#handleAbort, { once: true }) + + this.#processEntriesPromise = this.#processEntries() + this.#processEntriesPromise.catch(this.#handleError) + } + + /** + * Start processing entries from the entries stream - if an entry matches the + * filter, and we don't already have it, queue it for download. If the + * Downloader is live, this method will never resolve, otherwise it will + * resolve when all the entries have been processed and downloaded. + */ + async #processEntries() { + for await (const entry of this.#entriesStream) { + this.#ac.signal.throwIfAborted() + const { + driveId, + key: filePath, + value: { blob }, + } = entry + if (!this.#shouldDownloadFile(filePath)) continue + const drive = this.#driveIndex.get(driveId) + // ERROR HANDLING: this is unexpected and should not happen + if (!drive) throw new Error('Drive not found: ' + driveId) + const blobs = await drive.getBlobs() + this.#ac.signal.throwIfAborted() + await this.#processEntry(blobs.core, blob) + this.#ac.signal.throwIfAborted() + } + throw new Error('Entries stream ended unexpectedly') + } + + /** + * Update state and queue missing entries for download + * + * @param {import('hypercore')} blobsCore + * @param {{ blockOffset: number, blockLength: number, byteLength: number }} blob + */ + async #processEntry(blobsCore, { blockOffset: start, blockLength: length }) { + const end = start + length + const have = await blobsCore.has(start, end) + this.#ac.signal.throwIfAborted() + if (have) return + const download = blobsCore.download({ start, end }) + this.#queuedDownloads.add(download) + download + .done() + // According to the code, this should never throw. + .catch(this.#handleError) + .finally(() => { + this.#queuedDownloads.delete(download) + }) + } + + /** + * Cancel the downloads and clean up resources. + */ + destroy() { + this.#ac.abort() + } + + /** @param {Error} error */ + #handleError = (error) => { + if (this.#ac.signal.aborted) return + this.emit('error', error) + this.#ac.abort(error) + } + + #handleAbort = () => { + for (const download of this.#queuedDownloads) download.destroy() + this.#ac.signal.removeEventListener('abort', this.#handleAbort) + this.#entriesStream.removeListener('error', this.#ac.abort) + // queuedDownloads is likely to be empty here anyway, but clear just in case. + this.#queuedDownloads.clear() + this.#entriesStream.destroy() + } +} diff --git a/src/blob-store/entries-stream.js b/src/blob-store/entries-stream.js new file mode 100644 index 000000000..deb3c6492 --- /dev/null +++ b/src/blob-store/entries-stream.js @@ -0,0 +1,81 @@ +import SubEncoder from 'sub-encoder' +import mergeStreams from '@sindresorhus/merge-streams' +import { Transform, pipeline } from 'node:stream' +import { noop } from '../utils.js' + +/** @import Hyperdrive from 'hyperdrive' */ +/** @import { BlobStoreEntriesStream } from '../types.js' */ +/** @import { THyperdriveIndex } from './hyperdrive-index.js' */ + +const keyEncoding = new SubEncoder('files', 'utf-8') + +/** + * + * @param {THyperdriveIndex} driveIndex + * @param {object} opts + * @param {boolean} [opts.live=false] + * @returns {BlobStoreEntriesStream} + */ +export function createEntriesStream(driveIndex, { live = false } = {}) { + const mergedEntriesStreams = mergeStreams( + [...driveIndex].map((drive) => getHistoryStream(drive.db, { live })) + ) + driveIndex.on('add-drive', addDrive) + // Close is always emitted, so we can use it to remove the listener + mergedEntriesStreams.once('close', () => + driveIndex.off('add-drive', addDrive) + ) + return mergedEntriesStreams + + /** @param {Hyperdrive} drive */ + function addDrive(drive) { + mergedEntriesStreams.add(getHistoryStream(drive.db, { live })) + } +} + +/** + * + * @param {import('hyperbee')} bee + * @param {object} opts + * @param {boolean} opts.live + */ +function getHistoryStream(bee, { live }) { + // This will also include old versions of files, but it is the only way to + // get a live stream from a Hyperbee, however we currently do not support + // edits of blobs, so this should not be an issue, and the consequence is + // that old versions are downloaded too, which is acceptable. + const historyStream = bee.createHistoryStream({ + live, + // `keyEncoding` is necessary because hyperdrive stores file index data + // under the `files` sub-encoding key + keyEncoding, + }) + return pipeline(historyStream, new AddDriveIds(bee.core), noop) +} + +class AddDriveIds extends Transform { + #core + #cachedDriveId + + /** @param {import('hypercore')} core */ + constructor(core) { + super({ objectMode: true }) + this.#core = core + this.#cachedDriveId = core.discoveryKey?.toString('hex') + } + + /** @type {Transform['_transform']} */ + _transform(entry, _, callback) { + // Minimal performance optimization to only call toString() once. + // core.discoveryKey will always be defined by the time it starts + // streaming, but could be null when the instance is first created. + let driveId + if (this.#cachedDriveId) { + driveId = this.#cachedDriveId + } else { + driveId = this.#core.discoveryKey?.toString('hex') + this.#cachedDriveId = driveId + } + callback(null, { ...entry, driveId }) + } +} diff --git a/src/blob-store/hyperdrive-index.js b/src/blob-store/hyperdrive-index.js new file mode 100644 index 000000000..0f239bdfd --- /dev/null +++ b/src/blob-store/hyperdrive-index.js @@ -0,0 +1,122 @@ +import b4a from 'b4a' +import { discoveryKey } from 'hypercore-crypto' +import Hyperdrive from 'hyperdrive' +import util from 'node:util' +import { TypedEmitter } from 'tiny-typed-emitter' + +/** @typedef {HyperdriveIndexImpl} THyperdriveIndex */ + +/** + * @extends {TypedEmitter<{ 'add-drive': (drive: Hyperdrive) => void }>} + */ +export class HyperdriveIndexImpl extends TypedEmitter { + /** @type {Map} */ + #hyperdrives = new Map() + #writer + #writerKey + /** @param {import('../core-manager/index.js').CoreManager} coreManager */ + constructor(coreManager) { + super() + /** @type {undefined | Hyperdrive} */ + let writer + const corestore = new PretendCorestore({ coreManager }) + const blobIndexCores = coreManager.getCores('blobIndex') + const writerCoreRecord = coreManager.getWriterCore('blobIndex') + this.#writerKey = writerCoreRecord.key + for (const { key } of blobIndexCores) { + // @ts-ignore - we know pretendCorestore is not actually a Corestore + const drive = new Hyperdrive(corestore, key) + // We use the discovery key to derive the id for a drive + this.#hyperdrives.set(getDiscoveryId(key), drive) + if (key.equals(this.#writerKey)) { + writer = drive + } + } + if (!writer) { + throw new Error('Could not find a writer for the blobIndex namespace') + } + this.#writer = writer + + coreManager.on('add-core', ({ key, namespace }) => { + if (namespace !== 'blobIndex') return + // We use the discovery key to derive the id for a drive + const driveId = getDiscoveryId(key) + if (this.#hyperdrives.has(driveId)) return + // @ts-ignore - we know pretendCorestore is not actually a Corestore + const drive = new Hyperdrive(corestore, key) + this.#hyperdrives.set(driveId, drive) + this.emit('add-drive', drive) + }) + } + get writer() { + return this.#writer + } + get writerKey() { + return this.#writerKey + } + [Symbol.iterator]() { + return this.#hyperdrives.values() + } + /** @param {string} driveId */ + get(driveId) { + return this.#hyperdrives.get(driveId) + } +} + +/** + * Implements the `get()` method as used by hyperdrive-next. It returns the + * relevant cores from the Mapeo CoreManager. + */ +class PretendCorestore { + #coreManager + /** + * @param {object} options + * @param {import('../core-manager/index.js').CoreManager} options.coreManager + */ + constructor({ coreManager }) { + this.#coreManager = coreManager + } + + /** + * @param {Buffer | { publicKey: Buffer } | { name: string }} opts + * @returns {import('hypercore')<"binary", Buffer> | undefined} + */ + get(opts) { + if (b4a.isBuffer(opts)) { + opts = { publicKey: opts } + } + if ('key' in opts) { + // @ts-ignore + opts.publicKey = opts.key + } + if ('publicKey' in opts) { + // NB! We should always add blobIndex (Hyperbee) cores to the core manager + // before we use them here. We would only reach the addCore path if the + // blob core is read from the hyperbee header (before it is added to the + // core manager) + return ( + this.#coreManager.getCoreByKey(opts.publicKey) || + this.#coreManager.addCore(opts.publicKey, 'blob').core + ) + } else if (opts.name === 'db') { + return this.#coreManager.getWriterCore('blobIndex').core + } else if (opts.name.includes('blobs')) { + return this.#coreManager.getWriterCore('blob').core + } else { + throw new Error( + 'Unsupported corestore.get() with opts ' + util.inspect(opts) + ) + } + } + + /** no-op */ + close() {} +} + +/** + * @param {Buffer} key Public key of hypercore + * @returns {string} Hex-encoded string of derived discovery key + */ +function getDiscoveryId(key) { + return discoveryKey(key).toString('hex') +} diff --git a/src/blob-store/index.js b/src/blob-store/index.js index c1a1761f4..d03a9e176 100644 --- a/src/blob-store/index.js +++ b/src/blob-store/index.js @@ -1,22 +1,23 @@ -import Hyperdrive from 'hyperdrive' -import b4a from 'b4a' -import util from 'node:util' +import { pipeline } from 'node:stream' import { discoveryKey } from 'hypercore-crypto' +import { Downloader } from './downloader.js' +import { createEntriesStream } from './entries-stream.js' +import { FilterEntriesStream } from './utils.js' +import { noop } from '../utils.js' import { TypedEmitter } from 'tiny-typed-emitter' -import { LiveDownload } from './live-download.js' +import { HyperdriveIndexImpl as HyperdriveIndex } from './hyperdrive-index.js' + +/** @import Hyperdrive from 'hyperdrive' */ /** @import { JsonObject } from 'type-fest' */ /** @import { Readable as NodeReadable } from 'node:stream' */ /** @import { Readable as StreamxReadable, Writable } from 'streamx' */ -/** @import { BlobId } from '../types.js' */ -/** @import { BlobDownloadEvents } from './live-download.js' */ +/** @import { BlobFilter, BlobId, BlobStoreEntriesStream } from '../types.js' */ /** * @internal * @typedef {NodeReadable | StreamxReadable} Readable */ -/** @typedef {TypedEmitter<{ 'add-drive': (drive: import('hyperdrive')) => void }>} InternalDriveEmitter */ - // prop = blob type name // value = array of blob variants supported for that type const SUPPORTED_BLOB_VARIANTS = /** @type {const} */ ({ @@ -37,57 +38,31 @@ class ErrNotFound extends Error { } } -export class BlobStore { - /** @type {Map} Indexed by hex-encoded discovery key */ - #hyperdrives = new Map() - #writer - /** - * Used to communicate to live download instances when new drives are added - * @type {InternalDriveEmitter} - */ - #driveEmitter = new TypedEmitter() +/** @extends {TypedEmitter<{ error: (error: Error) => void }>} */ +export class BlobStore extends TypedEmitter { + #driveIndex + /** @type {Downloader} */ + #downloader /** * @param {object} options * @param {import('../core-manager/index.js').CoreManager} options.coreManager + * @param {BlobFilter | null} options.downloadFilter - Filter blob types and/or variants to download. Set to `null` to download all blobs. */ - constructor({ coreManager }) { - /** @type {undefined | (Hyperdrive & { key: Buffer })} */ - let writer - const corestore = new PretendCorestore({ coreManager }) - const blobIndexCores = coreManager.getCores('blobIndex') - const { key: writerKey } = coreManager.getWriterCore('blobIndex') - for (const { key } of blobIndexCores) { - // @ts-ignore - we know pretendCorestore is not actually a Corestore - const drive = new Hyperdrive(corestore, key) - // We use the discovery key to derive the id for a drive - this.#hyperdrives.set(getDiscoveryId(key), drive) - if (key.equals(writerKey)) { - writer = proxyProps(drive, { key: writerKey }) - } - } - if (!writer) { - throw new Error('Could not find a writer for the blobIndex namespace') - } - this.#writer = writer - - coreManager.on('add-core', ({ key, namespace }) => { - if (namespace !== 'blobIndex') return - // We use the discovery key to derive the id for a drive - const driveId = getDiscoveryId(key) - if (this.#hyperdrives.has(driveId)) return - // @ts-ignore - we know pretendCorestore is not actually a Corestore - const drive = new Hyperdrive(corestore, key) - this.#hyperdrives.set(driveId, drive) - this.#driveEmitter.emit('add-drive', drive) + constructor({ coreManager, downloadFilter }) { + super() + this.#driveIndex = new HyperdriveIndex(coreManager) + this.#downloader = new Downloader(this.#driveIndex, { + filter: downloadFilter, }) + this.#downloader.on('error', (error) => this.emit('error', error)) } /** * @returns {string} */ get writerDriveId() { - return getDiscoveryId(this.#writer.key) + return getDiscoveryId(this.#driveIndex.writerKey) } /** @@ -95,7 +70,7 @@ export class BlobStore { * @returns {Hyperdrive} */ #getDrive(driveId) { - const drive = this.#hyperdrives.get(driveId) + const drive = this.#driveIndex.get(driveId) if (!drive) throw new Error('Drive not found ' + driveId.slice(0, 7)) return drive } @@ -116,23 +91,18 @@ export class BlobStore { } /** - * Download blobs from all drives, optionally filtering particular blob types - * or blob variants. Download will be 'live' and will continue downloading new - * data as it becomes available from any replicating drive. + * Set the filter for downloading blobs. * - * If no filter is specified, all blobs will be downloaded. If a filter is - * specified, then _only_ blobs that match the filter will be downloaded. - * - * @param {import('../types.js').BlobFilter} [filter] Filter blob types and/or variants to download. Filter is { [BlobType]: BlobVariants[] }. At least one blob variant must be specified for each blob type. - * @param {object} options - * @param {AbortSignal} [options.signal] Optional AbortSignal to cancel in-progress download - * @returns {TypedEmitter} + * @param {import('../types.js').BlobFilter | null} filter Filter blob types and/or variants to download. Filter is { [BlobType]: BlobVariants[] }. At least one blob variant must be specified for each blob type. + * @returns {void} */ - download(filter, { signal } = {}) { - return new LiveDownload(this.#hyperdrives.values(), this.#driveEmitter, { + setDownloadFilter(filter) { + this.#downloader.removeAllListeners() + this.#downloader.destroy() + this.#downloader = new Downloader(this.#driveIndex, { filter, - signal, }) + this.#downloader.on('error', (error) => this.emit('error', error)) } /** @@ -154,6 +124,22 @@ export class BlobStore { return drive.createReadStream(path, options) } + /** + * This is a low-level method to create a stream of entries from all drives. + * It includes entries for unknown blob types and variants. + * + * @param {object} opts + * @param {boolean} [opts.live=false] Set to `true` to get a live stream of entries + * @param {import('./utils.js').GenericBlobFilter | null} [opts.filter] Filter blob types and/or variants in returned entries. Filter is { [BlobType]: BlobVariants[] }. + * @returns {BlobStoreEntriesStream} + */ + createEntriesReadStream({ live = false, filter } = {}) { + const entriesStream = createEntriesStream(this.#driveIndex, { live }) + if (!filter) return entriesStream + const filterStream = new FilterEntriesStream(filter) + return pipeline(entriesStream, filterStream, noop) + } + /** * Optimization for creating the blobs read stream when you have * previously read the entry from Hyperdrive using `drive.entry` @@ -163,7 +149,7 @@ export class BlobStore { * @param {boolean} [options.wait=false] Set to `true` to wait for a blob to download, otherwise will throw if blob is not available locally * @returns {Promise} */ - async createEntryReadStream(driveId, entry, options = { wait: false }) { + async createReadStreamFromEntry(driveId, entry, options = { wait: false }) { const drive = this.#getDrive(driveId) const blobs = await drive.getBlobs() @@ -206,7 +192,7 @@ export class BlobStore { */ async put({ type, variant, name }, blob, options) { const path = makePath({ type, variant, name }) - await this.#writer.put(path, blob, options) + await this.#driveIndex.writer.put(path, blob, options) return this.writerDriveId } @@ -218,7 +204,7 @@ export class BlobStore { */ createWriteStream({ type, variant, name }, options) { const path = makePath({ type, variant, name }) - const stream = this.#writer.createWriteStream(path, options) + const stream = this.#driveIndex.writer.createWriteStream(path, options) return proxyProps(stream, { driveId: this.writerDriveId, }) @@ -236,7 +222,7 @@ export class BlobStore { { type, variant, name, driveId }, options = { follow: false, wait: false } ) { - const drive = this.#hyperdrives.get(driveId) + const drive = this.#driveIndex.get(driveId) if (!drive) throw new Error('Drive not found ' + driveId.slice(0, 7)) const path = makePath({ type, variant, name }) const entry = await drive.entry(path, options) @@ -255,6 +241,11 @@ export class BlobStore { return drive.clear(path, options) } + + close() { + this.#downloader.removeAllListeners() + this.#downloader.destroy() + } } /** @@ -282,56 +273,6 @@ function makePath({ type, variant, name }) { return `/${type}/${variant}/${name}` } -/** - * Implements the `get()` method as used by hyperdrive-next. It returns the - * relevant cores from the Mapeo CoreManager. - */ -class PretendCorestore { - #coreManager - /** - * @param {object} options - * @param {import('../core-manager/index.js').CoreManager} options.coreManager - */ - constructor({ coreManager }) { - this.#coreManager = coreManager - } - - /** - * @param {Buffer | { publicKey: Buffer } | { name: string }} opts - * @returns {import('hypercore')<"binary", Buffer> | undefined} - */ - get(opts) { - if (b4a.isBuffer(opts)) { - opts = { publicKey: opts } - } - if ('key' in opts) { - // @ts-ignore - opts.publicKey = opts.key - } - if ('publicKey' in opts) { - // NB! We should always add blobIndex (Hyperbee) cores to the core manager - // before we use them here. We would only reach the addCore path if the - // blob core is read from the hyperbee header (before it is added to the - // core manager) - return ( - this.#coreManager.getCoreByKey(opts.publicKey) || - this.#coreManager.addCore(opts.publicKey, 'blob').core - ) - } else if (opts.name === 'db') { - return this.#coreManager.getWriterCore('blobIndex').core - } else if (opts.name.includes('blobs')) { - return this.#coreManager.getWriterCore('blob').core - } else { - throw new Error( - 'Unsupported corestore.get() with opts ' + util.inspect(opts) - ) - } - } - - /** no-op */ - close() {} -} - /** * @param {Buffer} key Public key of hypercore * @returns {string} Hex-encoded string of derived discovery key diff --git a/src/blob-store/live-download.js b/src/blob-store/live-download.js deleted file mode 100644 index 0b8ac51c7..000000000 --- a/src/blob-store/live-download.js +++ /dev/null @@ -1,373 +0,0 @@ -import { TypedEmitter } from 'tiny-typed-emitter' -import { once } from 'node:events' -import SubEncoder from 'sub-encoder' - -const keyEncoding = new SubEncoder('files', 'utf-8') - -/** - * @typedef {object} BlobDownloadState - * @property {number} haveCount The number of files already downloaded - * @property {number} haveBytes The bytes already downloaded - * @property {number} wantCount The number of files pending download - * @property {number} wantBytes The bytes pending download - * @property {null} error If status = 'error' then this will be an Error object - * @property {'checking' | 'downloading' | 'downloaded' | 'aborted'} status - */ - -/** @typedef {Omit & { status: 'error', error: Error }} BlobDownloadStateError */ - -/** - * @typedef {object} BlobDownloadEvents - * @property {(state: BlobDownloadState | BlobDownloadStateError ) => void} state Emitted with the current download state whenever it changes (not emitted during initial 'checking' status) - */ - -/** - * LiveDownload class - * @extends {TypedEmitter} - */ -export class LiveDownload extends TypedEmitter { - /** @type {Set} */ - #driveLiveDownloads = new Set() - #signal - - /** - * Like drive.download() but 'live', and for multiple drives - * @param {Iterable} drives - * @param {import('./index.js').InternalDriveEmitter} emitter - * @param {object} options - * @param {import('../types.js').BlobFilter} [options.filter] Filter blobs of specific types and/or sizes to download - * @param {AbortSignal} [options.signal] - */ - constructor(drives, emitter, { filter, signal }) { - super() - this.#signal = signal - - const emitState = () => { - this.emit('state', this.state) - } - - /** @param {import('hyperdrive')} drive */ - const addDrive = (drive) => { - const download = new DriveLiveDownload(drive, { - filter, - signal, - }) - this.#driveLiveDownloads.add(download) - download.on('state', emitState) - } - - for (const drive of drives) addDrive(drive) - emitter.on('add-drive', addDrive) - - signal?.addEventListener( - 'abort', - () => { - emitter.off('add-drive', addDrive) - for (const download of this.#driveLiveDownloads) { - download.off('state', emitState) - } - }, - { once: true } - ) - } - - /** - * @returns {BlobDownloadState | BlobDownloadStateError} - */ - get state() { - return combineStates(this.#driveLiveDownloads, { signal: this.#signal }) - } -} - -/** - * LiveDownload class - * @extends {TypedEmitter} - */ -export class DriveLiveDownload extends TypedEmitter { - #haveCount = 0 - #haveBytes = 0 - #wantBytes = 0 - #initialCheck = true - #drive - #folders - /** @type {Set<{ done(): Promise, destroy(): void }>} */ - #downloads = new Set() - /** @type {Error | null} */ - #error = null - #signal - - /** - * Like drive.download() but 'live', - * @param {import('hyperdrive')} drive - * @param {object} options - * @param {import('../types.js').BlobFilter} [options.filter] Filter blobs of specific types and/or sizes to download - * @param {AbortSignal} [options.signal] - */ - constructor(drive, { filter, signal } = {}) { - super() - this.#drive = drive - this.#folders = filterToFolders(filter) - this.#signal = signal - if (signal && !signal.aborted) { - signal.addEventListener( - 'abort', - () => { - for (const download of this.#downloads) download.destroy() - this.#downloads.clear() - this.emit('state', this.state) - }, - { once: true } - ) - } - this.#start().catch(this.#handleError.bind(this)) - } - - /** - * @returns {BlobDownloadState | BlobDownloadStateError} - */ - get state() { - if (this.#error) { - return { - haveCount: this.#haveCount, - haveBytes: this.#haveBytes, - wantCount: this.#downloads.size, - wantBytes: this.#wantBytes, - error: this.#error, - status: 'error', - } - } - return { - haveCount: this.#haveCount, - haveBytes: this.#haveBytes, - wantCount: this.#downloads.size, - wantBytes: this.#wantBytes, - error: null, - status: this.#signal?.aborted - ? 'aborted' - : this.#initialCheck - ? 'checking' - : this.#downloads.size > 0 - ? 'downloading' - : 'downloaded', - } - } - - async #start() { - const blobsCore = await this.#getBlobsCore() - /* c8 ignore next */ - if (this.#signal?.aborted || !blobsCore) return // Can't get here in tests - let seq = 0 - - for (const folder of this.#folders) { - // Don't emit state during initial iteration of existing data, since this is - // likely fast and not useful UX feedback - const entryStream = this.#drive.list(folder, { recursive: true }) - if (this.#signal) { - this.#signal.addEventListener('abort', () => entryStream.destroy(), { - once: true, - }) - } - for await (const entry of entryStream) { - if (this.#signal?.aborted) return - seq = Math.max(seq, entry.seq) - const { blob } = entry.value - if (!blob) continue - await this.#processEntry(blobsCore, blob) - } - if (this.#signal?.aborted) return - } - - this.#initialCheck = false - this.emit('state', this.state) - - const bee = this.#drive.db - // This will also download old versions of files, but it is the only way to - // get a live stream from a Hyperbee, however we currently do not support - // edits of blobs, so this should not be an issue. `keyEncoding` is - // necessary because hyperdrive stores file index data under the `files` - // sub-encoding key - const historyStream = bee.createHistoryStream({ - live: true, - gt: seq, - keyEncoding, - }) - if (this.#signal) { - this.#signal.addEventListener('abort', () => historyStream.destroy(), { - once: true, - }) - } - for await (const entry of historyStream) { - if (this.#signal?.aborted) return - const { blob } = entry.value - if (!blob) continue - if (!matchesFolder(entry.key, this.#folders)) continue - // TODO: consider cancelling downloads when a delete entry is found? - // Probably not worth the extra work. - if (entry.type !== 'put') continue - const wasDownloaded = this.state.status === 'downloaded' - await this.#processEntry(blobsCore, blob) - if (wasDownloaded && this.state.status === 'downloading') { - // State has changed, so emit - this.emit('state', this.state) - } - } - /* c8 ignore next 2 */ - // Could possibly reach here if aborted after check in loop, hard to test - this.emit('state', this.state) - } - - /** - * If a Hyperdrive has been added by its key and has never replicated, then - * drive.getBlobs() will not resolve until replication starts. Since we do not - * want the downloader to remain in the "checking" state forever, we catch - * this case and update the state before waiting for the hyperdrive hyperblobs - * instance. This also makes waiting for the blobs instance cancellable. - * - * @returns {Promise} - */ - async #getBlobsCore() { - if (this.#drive.blobs) return this.#drive.blobs.core - await this.#drive.ready() - await this.#drive.core.update({ wait: true }) - - // If no peers at this stage, we are not going to be able to get the blobs - // until a peer appears, so consider this state "downloaded", because - // otherwise this will just hang as "checking" - if (!this.#drive.core.peers.length) { - this.#initialCheck = false - this.emit('state', this.state) - } - try { - const [blobs] = await once(this.#drive, 'blobs', { signal: this.#signal }) - return blobs.core - } catch (e) { - if (e instanceof Error && e.name === 'AbortError') return - throw e - } - } - - /** @param {Error} e */ - #handleError(e) { - this.#error = e - this.emit('state', this.state) - } - - /** - * Update state and queue missing entries for download - * - * @param {import('hypercore')} core - * @param {{ blockOffset: number, blockLength: number, byteLength: number }} blob - */ - async #processEntry( - core, - { blockOffset: start, blockLength: length, byteLength } - ) { - const end = start + length - const have = await core.has(start, end) - if (have) { - this.#haveCount++ - this.#haveBytes += byteLength - } else { - this.#wantBytes += byteLength - const download = core.download({ start, end }) - this.#downloads.add(download) - download - .done() - .then(() => { - this.#downloads.delete(download) - this.#haveCount++ - this.#haveBytes += byteLength - this.#wantBytes -= byteLength - this.emit('state', this.state) - }) - .catch(this.#handleError.bind(this)) - } - } -} - -/** - * Reduce multiple states into one. Factored out for unit testing because I - * don't trust my coding. Probably a smarter way to do this, but this works. - * - * @param {Iterable<{ state: BlobDownloadState | BlobDownloadStateError }>} liveDownloads - * @param {{ signal?: AbortSignal }} options - * @returns {BlobDownloadState | BlobDownloadStateError} - */ -export function combineStates(liveDownloads, { signal } = {}) { - /** @type {BlobDownloadState | BlobDownloadStateError} */ - let combinedState = { - haveCount: 0, - haveBytes: 0, - wantCount: 0, - wantBytes: 0, - error: null, - status: 'downloaded', - } - for (const { state } of liveDownloads) { - combinedState.haveCount += state.haveCount - combinedState.haveBytes += state.haveBytes - combinedState.wantCount += state.wantCount - combinedState.wantBytes += state.wantBytes - if (state.status === combinedState.status) continue - if (state.status === 'error') { - combinedState = { ...combinedState, error: state.error, status: 'error' } - } else if ( - state.status === 'downloading' && - combinedState.status === 'downloaded' - ) { - combinedState = { ...combinedState, status: 'downloading' } - } else if ( - state.status === 'checking' && - (combinedState.status === 'downloaded' || - combinedState.status === 'downloading') - ) { - combinedState = { ...combinedState, status: 'checking' } - } - } - if (signal?.aborted) { - combinedState.status = 'aborted' - } - return combinedState -} - -/** - * Convert a filter to an array of folders that need to be downloaded - * - * @param {import('../types.js').BlobFilter} [filter] - * @returns {string[]} array of folders that match the filter - */ -function filterToFolders(filter) { - if (!filter) return ['/'] - const folders = [] - for (const [ - type, - variants, - ] of /** @type {import('type-fest').Entries} */ ( - Object.entries(filter) - )) { - // De-dupe variants array - for (const variant of new Set(variants)) { - folders.push(makePath({ type, variant })) - } - } - return folders -} - -/** - * Returns true if the path is within one of the given folders - * - * @param {string} path - * @param {string[]} folders - * @returns {boolean} - */ -function matchesFolder(path, folders) { - for (const folder of folders) { - if (path.startsWith(folder)) return true - } - return false -} - -/** @param {Pick} opts */ -function makePath({ type, variant }) { - return `/${type}/${variant}` -} diff --git a/src/blob-store/utils.js b/src/blob-store/utils.js new file mode 100644 index 000000000..f9f9e685e --- /dev/null +++ b/src/blob-store/utils.js @@ -0,0 +1,54 @@ +/** + * This is a more generic version of the BlobFilter type that can filter unknown + * blob types and variants from the blob store. + * + * @typedef {{ [type: string]: readonly string[] }} GenericBlobFilter + */ + +import { Transform } from 'node:stream' + +/** + * @param {GenericBlobFilter} filter + * @param {string} filePath + * @returns {boolean} + */ +export function filePathMatchesFilter(filter, filePath) { + const pathParts = filePath.split('/', 4) + const [shouldBeEmpty, type, variant] = pathParts + + if (typeof shouldBeEmpty !== 'string' || shouldBeEmpty) return false + + if (!type) return false + if (!Object.hasOwn(filter, type)) return false + + const allowedVariants = filter[type] ?? [] + if (allowedVariants.length === 0) { + return pathParts.length >= 3 + } else { + return ( + pathParts.length >= 4 && + typeof variant === 'string' && + allowedVariants.includes(variant) + ) + } +} + +/** @type {import("../types.js").BlobStoreEntriesStream} */ +export class FilterEntriesStream extends Transform { + #isIncludedInFilter + /** @param {GenericBlobFilter} filter */ + constructor(filter) { + super({ objectMode: true }) + this.#isIncludedInFilter = filePathMatchesFilter.bind(null, filter) + } + /** + * @param {import("hyperdrive").HyperdriveEntry} entry + * @param {Parameters[1]} _ + * @param {Parameters[2]} callback + */ + _transform(entry, _, callback) { + const { key: filePath } = entry + if (this.#isIncludedInFilter(filePath)) this.push(entry) + callback() + } +} diff --git a/src/core-manager/index.js b/src/core-manager/index.js index 35f977794..c119cdee8 100644 --- a/src/core-manager/index.js +++ b/src/core-manager/index.js @@ -298,7 +298,8 @@ export class CoreManager extends TypedEmitter { keyPair, encryptionKey: this.#encryptionKeys[namespace], }) - if (this.#autoDownload) { + if (this.#autoDownload && namespace !== 'blob') { + // Blob downloads are managed by BlobStore core.download({ start: 0, end: -1 }) } // Every peer adds a listener, so could have many peers diff --git a/src/discovery/local-discovery.js b/src/discovery/local-discovery.js index 5fdbf11e7..5f1574a11 100644 --- a/src/discovery/local-discovery.js +++ b/src/discovery/local-discovery.js @@ -9,6 +9,7 @@ import StartStopStateMachine from 'start-stop-state-machine' import pTimeout from 'p-timeout' import { keyToPublicId } from '@mapeo/crypto' import { Logger } from '../logger.js' +import { getErrorCode } from '../lib/error.js' /** @import { OpenedNoiseStream } from '../lib/noise-secret-stream-helpers.js' */ /** @typedef {{ publicKey: Buffer, secretKey: Buffer }} Keypair */ @@ -117,7 +118,7 @@ export class LocalDiscovery extends TypedEmitter { /** @param {Error} e */ function onSocketError(e) { - if ('code' in e && e.code === 'EPIPE') { + if (getErrorCode(e) === 'EPIPE') { socket.destroy() if (secretStream) { secretStream.destroy() diff --git a/src/fastify-plugins/blobs.js b/src/fastify-plugins/blobs.js index 894595c60..65b00896c 100644 --- a/src/fastify-plugins/blobs.js +++ b/src/fastify-plugins/blobs.js @@ -1,9 +1,11 @@ import fp from 'fastify-plugin' import { filetypemime } from 'magic-bytes.js' +import { pEvent } from 'p-event' import { Type as T } from '@sinclair/typebox' import { SUPPORTED_BLOB_VARIANTS } from '../blob-store/index.js' import { HEX_REGEX_32_BYTES, Z_BASE_32_REGEX_32_BYTES } from './constants.js' +import { getErrorMessage } from '../lib/error.js' /** @import { BlobId } from '../types.js' */ @@ -93,12 +95,25 @@ async function routes(fastify, options) { let blobStream try { - blobStream = await blobStore.createEntryReadStream(driveId, entry) + blobStream = await blobStore.createReadStreamFromEntry(driveId, entry) } catch (e) { reply.code(404) throw e } + try { + await pEvent(blobStream, 'readable', { rejectionEvents: ['error'] }) + } catch (err) { + // This matches [how Hyperblobs checks if a blob is unavailable][0]. + // [0]: https://github.com/holepunchto/hyperblobs/blob/518088d2b828082fd70a276fa2c8848a2cf2a56b/index.js#L49 + if (getErrorMessage(err) === 'Block not available') { + reply.code(404) + throw new Error('Blob not found') + } else { + throw err + } + } + // Extract the 'mimeType' property of the metadata and use it for the response header if found if ( metadata && diff --git a/src/fastify-plugins/maps.js b/src/fastify-plugins/maps.js index d3df66e66..04cf1dc19 100644 --- a/src/fastify-plugins/maps.js +++ b/src/fastify-plugins/maps.js @@ -5,6 +5,7 @@ import { ReaderWatch, Server as SMPServerPlugin } from 'styled-map-package' import { noop } from '../utils.js' import { NotFoundError, ENOENTError } from './utils.js' +import { getErrorCode } from '../lib/error.js' /** @import { FastifyPluginAsync } from 'fastify' */ /** @import { Stats } from 'node:fs' */ @@ -56,7 +57,7 @@ export async function plugin(fastify, opts) { try { stats = await fs.stat(customMapPath) } catch (err) { - if (err instanceof Error && 'code' in err && err.code === 'ENOENT') { + if (getErrorCode(err) === 'ENOENT') { throw new ENOENTError(customMapPath) } diff --git a/src/lib/error.js b/src/lib/error.js index 41cbe5544..61436dd22 100644 --- a/src/lib/error.js +++ b/src/lib/error.js @@ -21,6 +21,30 @@ export class ErrorWithCode extends Error { } } +/** + * If the argument is an `Error` instance, return its `code` property if it is a string. + * Otherwise, returns `undefined`. + * + * @param {unknown} maybeError + * @returns {undefined | string} + * @example + * try { + * // do something + * } catch (err) { + * console.error(getErrorCode(err)) + * } + */ +export function getErrorCode(maybeError) { + if ( + maybeError instanceof Error && + 'code' in maybeError && + typeof maybeError.code === 'string' + ) { + return maybeError.code + } + return undefined +} + /** * Get the error message from an object if possible. * Otherwise, stringify the argument. diff --git a/src/mapeo-project.js b/src/mapeo-project.js index dcb719373..c8d703310 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -49,7 +49,10 @@ import { omit } from './lib/omit.js' import { MemberApi } from './member-api.js' import { SyncApi, + kAddBlobWantRange, + kClearBlobWantRanges, kHandleDiscoveryKey, + kSetBlobDownloadFilter, kWaitForInitialSyncWithPeer, } from './sync/sync-api.js' import { Logger } from './logger.js' @@ -57,8 +60,9 @@ import { IconApi } from './icon-api.js' import { readConfig } from './config-import.js' import TranslationApi from './translation-api.js' import { NotFoundError } from './errors.js' +import { getErrorCode, getErrorMessage } from './lib/error.js' /** @import { ProjectSettingsValue } from '@comapeo/schema' */ -/** @import { CoreStorage, KeyPair, Namespace, ReplicationStream } from './types.js' */ +/** @import { CoreStorage, BlobFilter, BlobStoreEntriesStream, KeyPair, Namespace, ReplicationStream } from './types.js' */ /** @typedef {Omit} EditableProjectSettings */ /** @typedef {ProjectSettingsValue['configMetadata']} ConfigMetadata */ @@ -78,6 +82,13 @@ export const kIsArchiveDevice = Symbol('isArchiveDevice (temp - test only)') const EMPTY_PROJECT_SETTINGS = Object.freeze({}) +/** @type {import('./types.js').BlobFilter} */ +const NON_ARCHIVE_DEVICE_DOWNLOAD_FILTER = { + photo: ['preview', 'thumbnail'], + // Don't download any audio of video files, since previews and + // thumbnails aren't supported yet. +} + /** * @extends {TypedEmitter<{ close: () => void }>} */ @@ -147,6 +158,8 @@ export class MapeoProject extends TypedEmitter { const getReplicationStream = this[kProjectReplicate].bind(this, true) + const blobDownloadFilter = getBlobDownloadFilter(isArchiveDevice) + ///////// 1. Setup database this.#sqlite = new Database(dbPath) @@ -363,6 +376,13 @@ export class MapeoProject extends TypedEmitter { this.#blobStore = new BlobStore({ coreManager: this.#coreManager, + downloadFilter: blobDownloadFilter, + }) + + this.#blobStore.on('error', (err) => { + // TODO: Handle this error in some way - this error will come from an + // unexpected error with background blob downloads + console.error('BlobStore error', err) }) this.$blobs = new BlobApi({ @@ -392,7 +412,7 @@ export class MapeoProject extends TypedEmitter { coreManager: this.#coreManager, coreOwnership: this.#coreOwnership, roles: this.#roles, - blobDownloadFilter: null, + blobDownloadFilter, logger: this.#l, getServerWebsocketUrls: async () => { const members = await this.#memberApi.getMany() @@ -414,6 +434,48 @@ export class MapeoProject extends TypedEmitter { getReplicationStream, }) + /** @type {Map} */ + const entriesReadStreams = new Map() + + this.#coreManager.on('peer-download-intent', async (filter, peerId) => { + entriesReadStreams.get(peerId)?.destroy() + + const entriesReadStream = this.#blobStore.createEntriesReadStream({ + live: true, + filter, + }) + entriesReadStreams.set(peerId, entriesReadStream) + + entriesReadStream.once('close', () => { + if (entriesReadStreams.get(peerId) === entriesReadStream) { + entriesReadStreams.delete(peerId) + } + }) + + this.#syncApi[kClearBlobWantRanges](peerId) + + try { + for await (const entry of entriesReadStream) { + const { blockOffset, blockLength } = entry.value.blob + this.#syncApi[kAddBlobWantRange](peerId, blockOffset, blockLength) + } + } catch (err) { + if (getErrorCode(err) === 'ERR_STREAM_PREMATURE_CLOSE') return + this.#l.log( + 'Error getting blob entries stream for peer %h: %s', + peerId, + getErrorMessage(err) + ) + } + }) + + this.#coreManager.creatorCore.on('peer-remove', (peer) => { + const peerKey = peer.protomux.stream.remotePublicKey + const peerId = peerKey.toString('hex') + entriesReadStreams.get(peerId)?.destroy() + entriesReadStreams.delete(peerId) + }) + this.#translationApi = new TranslationApi({ dataType: this.#dataTypes.translation, }) @@ -507,6 +569,7 @@ export class MapeoProject extends TypedEmitter { */ async close() { this.#l.log('closing project %h', this.#projectId) + this.#blobStore.close() const dataStorePromises = [] for (const dataStore of Object.values(this.#dataStores)) { dataStorePromises.push(dataStore.close()) @@ -722,8 +785,11 @@ export class MapeoProject extends TypedEmitter { /** @param {boolean} isArchiveDevice */ async [kSetIsArchiveDevice](isArchiveDevice) { + if (this.#isArchiveDevice === isArchiveDevice) return + const blobDownloadFilter = getBlobDownloadFilter(isArchiveDevice) + this.#blobStore.setDownloadFilter(blobDownloadFilter) + this.#syncApi[kSetBlobDownloadFilter](blobDownloadFilter) this.#isArchiveDevice = isArchiveDevice - // TODO: call this.#syncApi[kSetBlobDownloadFilter]() } /** @returns {boolean} */ @@ -989,6 +1055,14 @@ export class MapeoProject extends TypedEmitter { } } +/** + * @param {boolean} isArchiveDevice + * @returns {null | BlobFilter} + */ +function getBlobDownloadFilter(isArchiveDevice) { + return isArchiveDevice ? null : NON_ARCHIVE_DEVICE_DOWNLOAD_FILTER +} + /** * @param {import("@comapeo/schema").ProjectSettings & { forks: string[] }} projectDoc * @returns {EditableProjectSettings} diff --git a/src/sync/core-sync-state.js b/src/sync/core-sync-state.js index b96b241de..8f075708a 100644 --- a/src/sync/core-sync-state.js +++ b/src/sync/core-sync-state.js @@ -182,13 +182,23 @@ export class CoreSyncState { * blocks/ranges that are added here * * @param {PeerId} peerId - * @param {Array<{ start: number, length: number }>} ranges + * @param {number} start + * @param {number} length + * @returns {void} */ - setPeerWants(peerId, ranges) { + addWantRange(peerId, start, length) { const peerState = this.#getOrCreatePeerState(peerId) - for (const { start, length } of ranges) { - peerState.setWantRange({ start, length }) - } + peerState.addWantRange(start, length) + this.#update() + } + + /** + * @param {PeerId} peerId + * @returns {void} + */ + clearWantRanges(peerId) { + const peerState = this.#getOrCreatePeerState(peerId) + peerState.clearWantRanges() this.#update() } @@ -291,14 +301,13 @@ export class PeerState { #preHaves = new RemoteBitfield() /** @type {HypercoreRemoteBitfield | undefined} */ #haves - /** @type {Bitfield} */ - #wants = new RemoteBitfield() + /** + * What blocks do we want? If `null`, we want everything. + * @type {null | Bitfield} + */ + #wants = null /** @type {PeerNamespaceState['status']} */ status = 'stopped' - #wantAll - constructor({ wantAll = true } = {}) { - this.#wantAll = wantAll - } get preHavesBitfield() { return this.#preHaves } @@ -316,18 +325,27 @@ export class PeerState { this.#haves = bitfield } /** - * Set a range of blocks that a peer wants. This is not part of the Hypercore + * Add a range of blocks that a peer wants. This is not part of the Hypercore * protocol, so we need our own extension messages that a peer can use to * inform us which blocks they are interested in. For most cores peers always - * want all blocks, but for blob cores often peers only want preview or + * want all blocks, but for blob cores peers may only want preview or * thumbnail versions of media * - * @param {{ start: number, length: number }} range + * @param {number} start + * @param {number} length + * @returns {void} */ - setWantRange({ start, length }) { - this.#wantAll = false + addWantRange(start, length) { + this.#wants ??= new RemoteBitfield() this.#wants.setRange(start, length, true) } + /** + * Set the range of blocks that this peer wants to the empty set. In other + * words, this peer wants nothing from this core. + */ + clearWantRanges() { + this.#wants = new RemoteBitfield() + } /** * Returns whether the peer has the block at `index`. If a pre-have bitfield * has been passed, this is used if no connected peer bitfield is available. @@ -355,8 +373,7 @@ export class PeerState { * @param {number} index */ want(index) { - if (this.#wantAll) return true - return this.#wants.get(index) + return this.#wants ? this.#wants.get(index) : true } /** * Return the "wants" for the 32 blocks from `index`, as a 32-bit integer @@ -366,11 +383,10 @@ export class PeerState { * the 32 blocks from `index` */ wantWord(index) { - if (this.#wantAll) { - // This is a 32-bit number with all bits set - return 2 ** 32 - 1 - } - return getBitfieldWord(this.#wants, index) + return this.#wants + ? getBitfieldWord(this.#wants, index) + : // This is a 32-bit number with all bits set + 2 ** 32 - 1 } } diff --git a/src/sync/namespace-sync-state.js b/src/sync/namespace-sync-state.js index 699f90094..3080c9d10 100644 --- a/src/sync/namespace-sync-state.js +++ b/src/sync/namespace-sync-state.js @@ -136,6 +136,28 @@ export class NamespaceSyncState { this.#getCoreState(coreDiscoveryId).insertPreHaves(peerId, start, bitfield) } + /** + * @param {string} peerId + * @param {number} start + * @param {number} length + * @returns {void} + */ + addWantRange(peerId, start, length) { + for (const coreState of this.#coreStates.values()) { + coreState.addWantRange(peerId, start, length) + } + } + + /** + * @param {string} peerId + * @returns {void} + */ + clearWantRanges(peerId) { + for (const coreState of this.#coreStates.values()) { + coreState.clearWantRanges(peerId) + } + } + /** * @param {string} discoveryId */ diff --git a/src/sync/sync-api.js b/src/sync/sync-api.js index ea5ed77c4..5ed6e1674 100644 --- a/src/sync/sync-api.js +++ b/src/sync/sync-api.js @@ -16,7 +16,7 @@ import { NO_ROLE_ID } from '../roles.js' /** @import * as http from 'node:http' */ /** @import { CoreOwnership } from '../core-ownership.js' */ /** @import { OpenedNoiseStream } from '../lib/noise-secret-stream-helpers.js' */ -/** @import { ReplicationStream } from '../types.js' */ +/** @import { BlobFilter, ReplicationStream } from '../types.js' */ export const kHandleDiscoveryKey = Symbol('handle discovery key') export const kSyncState = Symbol('sync state') @@ -26,6 +26,8 @@ export const kWaitForInitialSyncWithPeer = Symbol( 'wait for initial sync with peer' ) export const kSetBlobDownloadFilter = Symbol('set isArchiveDevice') +export const kAddBlobWantRange = Symbol('add blob want range') +export const kClearBlobWantRanges = Symbol('clear blob want ranges') /** * @typedef {'initial' | 'full'} SyncType @@ -91,7 +93,8 @@ export class SyncApi extends TypedEmitter { #getReplicationStream /** @type {Map} */ #serverWebsockets = new Map() - #blobDownloadFilter + /** @type {null | BlobFilter} */ + #blobDownloadFilter = null /** * @param {object} opts @@ -100,7 +103,7 @@ export class SyncApi extends TypedEmitter { * @param {import('../roles.js').Roles} opts.roles * @param {() => Promise>} opts.getServerWebsocketUrls * @param {() => ReplicationStream} opts.getReplicationStream - * @param {import('../types.js').BlobFilter | null} opts.blobDownloadFilter + * @param {null | BlobFilter} opts.blobDownloadFilter * @param {number} [opts.throttleMs] * @param {Logger} [opts.logger] */ @@ -116,7 +119,6 @@ export class SyncApi extends TypedEmitter { }) { super() this.#l = Logger.create('syncApi', logger) - this.#blobDownloadFilter = blobDownloadFilter this.#coreManager = coreManager this.#coreOwnership = coreOwnership this.#roles = roles @@ -133,6 +135,8 @@ export class SyncApi extends TypedEmitter { this.#updateState(namespaceSyncState) }) + this[kSetBlobDownloadFilter](blobDownloadFilter) + this.#coreManager.creatorCore.on('peer-add', this.#handlePeerAdd) this.#coreManager.creatorCore.on('peer-remove', this.#handlePeerDisconnect) @@ -161,6 +165,28 @@ export class SyncApi extends TypedEmitter { } } + /** + * Add some blob blocks this peer wants. + * + * @param {string} peerId + * @param {number} start + * @param {number} length + * @returns {void} + */ + [kAddBlobWantRange](peerId, start, length) { + this[kSyncState].addBlobWantRange(peerId, start, length) + } + + /** + * Clear the blob blocks this peer wants. + * + * @param {string} peerId + * @returns {void} + */ + [kClearBlobWantRanges](peerId) { + this[kSyncState].clearBlobWantRanges(peerId) + } + /** @type {import('../local-peers.js').LocalPeersEvents['discovery-key']} */ [kHandleDiscoveryKey](discoveryKey, protomux) { const peerSyncController = this.#peerSyncControllers.get(protomux) diff --git a/src/sync/sync-state.js b/src/sync/sync-state.js index 7e64836a0..d24ecfd42 100644 --- a/src/sync/sync-state.js +++ b/src/sync/sync-state.js @@ -68,6 +68,24 @@ export class SyncState extends TypedEmitter { ]) } + /** + * @param {string} peerId + * @param {number} start + * @param {number} length + * @returns {void} + */ + addBlobWantRange(peerId, start, length) { + this.#syncStates.blob.addWantRange(peerId, start, length) + } + + /** + * @param {string} peerId + * @returns {void} + */ + clearBlobWantRanges(peerId) { + this.#syncStates.blob.clearWantRanges(peerId) + } + #handleUpdate = () => { this.emit('state', this.getState()) } diff --git a/src/types.ts b/src/types.ts index 853bde666..19be646be 100644 --- a/src/types.ts +++ b/src/types.ts @@ -14,6 +14,8 @@ import { Duplex } from 'streamx' import RandomAccessStorage from 'random-access-storage' import { DefaultListener, ListenerSignature } from 'tiny-typed-emitter' import type { NAMESPACES } from './constants.js' +import type { Readable } from 'stream' +import type { HyperdriveEntry } from 'hyperdrive' export type Namespace = (typeof NAMESPACES)[number] @@ -147,3 +149,9 @@ export type DefaultEmitterEvents< newListener: (event: keyof L, listener: L[keyof L]) => void removeListener: (event: keyof L, listener: L[keyof L]) => void } + +export type BlobStoreEntriesStream = Readable & { + [Symbol.asyncIterator](): AsyncIterableIterator< + HyperdriveEntry & { driveId: string } + > +} diff --git a/test-e2e/sync.js b/test-e2e/sync.js index 72a0bf256..56a794608 100644 --- a/test-e2e/sync.js +++ b/test-e2e/sync.js @@ -180,6 +180,148 @@ test('syncing blobs', async (t) => { }) }) +test('non-archive devices only sync a subset of blobs', async (t) => { + const invitor = createManager('invitor', t) + + const fastify = Fastify() + const fastifyController = new FastifyController({ fastify }) + t.after(() => fastifyController.stop()) + const invitee = createManager('invitee', t, { fastify }) + + invitee.setIsArchiveDevice(false) + + const managers = [invitee, invitor] + + await Promise.all([ + invitor.setDeviceInfo({ name: 'invitor', deviceType: 'mobile' }), + invitee.setDeviceInfo({ name: 'invitee', deviceType: 'mobile' }), + fastifyController.start(), + ]) + + const disconnectPeers = connectPeers(managers) + t.after(() => disconnectPeers()) + const projectId = await invitor.createProject({ name: 'Mapeo' }) + await invite({ invitor, invitees: [invitee], projectId }) + + const projects = await Promise.all([ + invitor.getProject(projectId), + invitee.getProject(projectId), + ]) + const [invitorProject, inviteeProject] = projects + + const fixturesPath = new URL('../test/fixtures/', import.meta.url) + + // Test that only previews and thumbnails sync to non-archive devices + + const imagesFixturesPath = new URL('images/', fixturesPath) + const photoFixturePaths = { + original: new URL('02-digidem-logo.jpg', imagesFixturesPath).pathname, + preview: new URL('02-digidem-logo-preview.jpg', imagesFixturesPath) + .pathname, + thumbnail: new URL('02-digidem-logo-thumb.jpg', imagesFixturesPath) + .pathname, + } + const audioFixturePath = new URL('blob-api/audio.mp3', fixturesPath).pathname + + const [photoBlob, audioBlob] = await Promise.all([ + invitorProject.$blobs.create( + photoFixturePaths, + blobMetadata({ mimeType: 'image/jpeg' }) + ), + invitorProject.$blobs.create( + { original: audioFixturePath }, + blobMetadata({ mimeType: 'audio/mpeg' }) + ), + ]) + + invitorProject.$sync.start() + inviteeProject.$sync.start() + + await waitForSync(projects, 'full') + + inviteeProject.$sync.stop() + inviteeProject.$sync.stop() + + /** + * @param {BlobId} blobId + * @param {string} path + */ + const assertLoads = async (blobId, path) => { + const expectedBytesPromise = fs.readFile(path) + + const originalBlobUrl = await inviteeProject.$blobs.getUrl(blobId) + const response = await request(originalBlobUrl, { reset: true }) + assert.equal(response.statusCode, 200) + assert.deepEqual( + Buffer.from(await response.body.arrayBuffer()), + await expectedBytesPromise, + 'blob makes it to the other side' + ) + } + + /** @param {BlobId} blobId */ + const assert404 = async (blobId) => { + const originalBlobUrl = await inviteeProject.$blobs.getUrl(blobId) + const response = await request(originalBlobUrl, { reset: true }) + assert.equal(response.statusCode, 404, 'blob is not synced') + } + + await Promise.all([ + assert404({ ...photoBlob, variant: 'original' }), + assert404({ ...audioBlob, variant: 'original' }), + // We have to tell TypeScript that the blob's type is "photo", which it + // isn't smart enough to figure out. + assertLoads( + { ...photoBlob, type: 'photo', variant: 'preview' }, + photoFixturePaths.preview + ), + assertLoads( + { ...photoBlob, type: 'photo', variant: 'thumbnail' }, + photoFixturePaths.thumbnail + ), + ]) + + // Devices can become archives again and get all the data + + invitee.setIsArchiveDevice(true) + + invitorProject.$sync.start() + inviteeProject.$sync.start() + + await waitForSync(projects, 'full') + + await Promise.all([ + assertLoads( + { ...photoBlob, variant: 'original' }, + photoFixturePaths.original + ), + assertLoads({ ...audioBlob, variant: 'original' }, audioFixturePath), + ]) + + // Devices can toggle whether they're an archive device while sync is running + + invitee.setIsArchiveDevice(false) + + const photoBlob2 = await invitorProject.$blobs.create( + photoFixturePaths, + blobMetadata({ mimeType: 'image/jpeg' }) + ) + + await waitForSync(projects, 'full') + + await Promise.all([ + assert404({ ...photoBlob2, variant: 'original' }), + assertLoads( + { ...photoBlob2, type: 'photo', variant: 'preview' }, + photoFixturePaths.preview + ), + assertLoads( + { ...photoBlob2, type: 'photo', variant: 'thumbnail' }, + photoFixturePaths.thumbnail + ), + ]) +}) + test('start and stop sync', async function (t) { // Checks that both peers need to start syncing for data to sync, and that // $sync.stop() actually stops data syncing diff --git a/test/blob-store/blob-store.js b/test/blob-store/blob-store.js index c6199271e..7bd5e4123 100644 --- a/test/blob-store/blob-store.js +++ b/test/blob-store/blob-store.js @@ -10,10 +10,13 @@ import { createCoreManager, waitForCores, } from '../helpers/core-manager.js' -import { BlobStore } from '../../src/blob-store/index.js' -import { setTimeout } from 'node:timers/promises' +import { + BlobStore, + SUPPORTED_BLOB_VARIANTS, +} from '../../src/blob-store/index.js' import { concat } from '../helpers/blob-store.js' import { discoveryKey } from 'hypercore-crypto' +import { setTimeout as delay } from 'node:timers/promises' // Test with buffers that are 3 times the default blockSize for hyperblobs const TEST_BUF_SIZE = 3 * 64 * 1024 @@ -286,9 +289,9 @@ test('blobStore.writerDriveId', async () => { }) // Tests: -// A) Downloads from peers connected when download() is first called -// B) Downloads from peers connected after download() is first called -test('live download', async function () { +// A) Downloads from peers blobs added before replication +// B) Downloads from peers blobs added after replication +test('download all blobs', async function () { const projectKey = randomBytes(32) const { blobStore: bs1, coreManager: cm1 } = testenv({ projectKey }) const { blobStore: bs2, coreManager: cm2 } = testenv({ projectKey }) @@ -311,17 +314,13 @@ test('live download', async function () { const driveId1 = await bs1.put(blob1Id, blob1) // STEP 2: Replicate CM1 with CM3 const { destroy: destroy1 } = replicate(cm1, cm3) - // STEP 3: Start live download to CM3 - const liveDownload = bs3.download() - // STEP 4: Wait for blobs to be downloaded - await downloaded(liveDownload) - // STEP 5: Replicate CM2 with CM3 + // STEP 3: Replicate CM2 with CM3 const { destroy: destroy2 } = replicate(cm2, cm3) - // STEP 6: Write a blob to CM2 + // STEP 4: Write a blob to CM2 const driveId2 = await bs2.put(blob2Id, blob2) - // STEP 7: Wait for blobs to be downloaded - await downloaded(liveDownload) - // STEP 8: destroy all the replication streams + // STEP 5: Wait for blobs to be downloaded + await delay(200) + // STEP 6: destroy all the replication streams await Promise.all([destroy1(), destroy2()]) // Both blob1 and blob2 (from CM1 and CM2) should have been downloaded to CM3 @@ -337,10 +336,13 @@ test('live download', async function () { ) }) -test('sparse live download', async function () { +test('filtered download, filter changed', async function () { const projectKey = randomBytes(32) const { blobStore: bs1, coreManager: cm1 } = testenv({ projectKey }) - const { blobStore: bs2, coreManager: cm2 } = testenv({ projectKey }) + const { blobStore: bs2, coreManager: cm2 } = testenv({ + projectKey, + downloadFilter: { photo: ['thumbnail', 'preview'] }, + }) const blob1 = randomBytes(TEST_BUF_SIZE) const blob1Id = /** @type {const} */ ({ @@ -367,76 +369,37 @@ test('sparse live download', async function () { const { destroy } = replicate(cm1, cm2) - const liveDownload = bs2.download({ photo: ['original', 'preview'] }) - await downloaded(liveDownload) - - await destroy() + // Wait for blobs to be downloaded + await delay(200) - assert.deepEqual( - await bs2.get({ ...blob1Id, driveId }), - blob1, - 'blob1 was downloaded' - ) assert.deepEqual( await bs2.get({ ...blob2Id, driveId }), blob2, - 'blob2 was downloaded' + 'preview was downloaded' + ) + assert.deepEqual( + await bs2.get({ ...blob3Id, driveId }), + blob3, + 'thumbnail was downloaded' ) await assert.rejects( - () => bs2.get({ ...blob3Id, driveId }), - 'blob3 was not downloaded' + () => bs2.get({ ...blob1Id, driveId }), + 'original was not downloaded' ) -}) -test('cancelled live download', async function () { - const projectKey = randomBytes(32) - const { blobStore: bs1, coreManager: cm1 } = testenv({ projectKey }) - const { blobStore: bs2, coreManager: cm2 } = testenv({ projectKey }) - const { blobStore: bs3, coreManager: cm3 } = testenv({ projectKey }) + // Change the filter to download all + bs2.setDownloadFilter(null) - const blob1 = randomBytes(TEST_BUF_SIZE) - const blob1Id = /** @type {const} */ ({ - type: 'photo', - variant: 'original', - name: 'blob1', - }) - const blob2 = randomBytes(TEST_BUF_SIZE) - const blob2Id = /** @type {const} */ ({ - type: 'photo', - variant: 'original', - name: 'blob2', - }) - - // STEP 1: Write a blob to CM1 - const driveId1 = await bs1.put(blob1Id, blob1) - // STEP 2: Replicate CM1 with CM3 - const { destroy: destroy1 } = replicate(cm1, cm3) - // STEP 3: Start live download to CM3 - const ac = new AbortController() - const liveDownload = bs3.download(undefined, { signal: ac.signal }) - // STEP 4: Wait for blobs to be downloaded - await downloaded(liveDownload) - // STEP 5: Cancel download - ac.abort() - // STEP 6: Replicate CM2 with CM3 - const { destroy: destroy2 } = replicate(cm2, cm3) - // STEP 7: Write a blob to CM2 - const driveId2 = await bs2.put(blob2Id, blob2) - // STEP 8: Wait for blobs to (not) download - await setTimeout(200) - // STEP 9: destroy all the replication streams - await Promise.all([destroy1(), destroy2()]) + // Wait for blobs to be downloaded + await delay(200) - // Both blob1 and blob2 (from CM1 and CM2) should have been downloaded to CM3 assert.deepEqual( - await bs3.get({ ...blob1Id, driveId: driveId1 }), + await bs2.get({ ...blob1Id, driveId }), blob1, - 'blob1 was downloaded' - ) - await assert.rejects( - async () => bs3.get({ ...blob2Id, driveId: driveId2 }), - 'blob2 was not downloaded' + 'original was downloaded' ) + + await destroy() }) test('blobStore.getEntryBlob(driveId, entry)', async () => { @@ -469,7 +432,7 @@ test('blobStore.getEntryReadStream(driveId, entry)', async () => { assert(entry) const buf = await concat( - await blobStore.createEntryReadStream(driveId, entry) + await blobStore.createReadStreamFromEntry(driveId, entry) ) assert.deepEqual(buf, diskbuf, 'should be equal') @@ -493,38 +456,176 @@ test('blobStore.getEntryReadStream(driveId, entry) should not wait', async () => await assert.rejects( async () => { - const stream = await blobStore.createEntryReadStream(driveId, entry) + const stream = await blobStore.createReadStreamFromEntry(driveId, entry) await concat(stream) }, { message: 'Block not available' } ) }) -/** - * @param {Parameters} args - */ -function testenv(...args) { - const coreManager = createCoreManager(...args) - const blobStore = new BlobStore({ coreManager }) - return { blobStore, coreManager } +test('blobStore.createEntriesReadStream({ live: false })', async (t) => { + const { blobStore } = testenv() + const blobIds = Array.from({ length: 50 }, randomBlobId) + + // Add some blobs with unknown variants and types + blobIds.push( + { + // @ts-expect-error + type: 'unknownType', + variant: 'original', + name: randomBytes(8).toString('hex'), + }, + { + type: 'photo', + variant: 'unknownVariant', + name: randomBytes(8).toString('hex'), + }, + { + type: 'photoExtra', + variant: 'original', + name: randomBytes(8).toString('hex'), + } + ) + for (const blobId of blobIds) { + await blobStore.put(blobId, Buffer.from([0])) + } + const inputKeys = blobIds.map(blobIdToKey) + + /** @param {import('../../src/types.js').BlobStoreEntriesStream} entriesStream */ + async function getKeys(entriesStream) { + const keys = new Set() + for await (const entry of entriesStream) { + keys.add(entry.key) + } + return keys + } + + await t.test('no filter, returns everything', async () => { + const expectedKeys = new Set(inputKeys) + const entriesStream = blobStore.createEntriesReadStream() + const keys = await getKeys(entriesStream) + assert.deepEqual(keys, expectedKeys, 'returns all keys') + }) + + await t.test('null filter, returns everything', async () => { + const expectedKeys = new Set(inputKeys) + const entriesStream = blobStore.createEntriesReadStream({ filter: null }) + const keys = await getKeys(entriesStream) + assert.deepEqual(keys, expectedKeys, 'returns all keys') + }) + + await t.test('blob type only, returns all variants', async () => { + const filter = { photo: [] } + const unexpectedKeys = new Set( + inputKeys.filter((key) => key.startsWith('/photo')) + ) + const expectedKeys = new Set( + inputKeys.filter((key) => key.startsWith('/photo/')) + ) + const entriesStream = blobStore.createEntriesReadStream({ filter }) + const keys = await getKeys(entriesStream) + assert.notDeepEqual( + keys, + unexpectedKeys, + 'does not return keys matched without trailing slash' + ) + assert.deepEqual(keys, expectedKeys, 'returns expected keys') + }) + + await t.test('multiple types and variants filter', async () => { + const filter = { + video: ['original'], + photo: ['preview'], + } + const expectedKeys = new Set( + inputKeys.filter( + (key) => + key.startsWith('/video/original/') || + key.startsWith('/photo/preview/') + ) + ) + const entriesStream = blobStore.createEntriesReadStream({ filter }) + const keys = await getKeys(entriesStream) + assert.deepEqual(keys, expectedKeys, 'returns expected keys') + }) + + await t.test('folders filter unknown blob type & variant', async () => { + const filter = { + unknownType: [], + photo: ['unknownVariant'], + } + const entriesStream = blobStore.createEntriesReadStream({ filter }) + const keys = await getKeys(entriesStream) + assert.deepEqual(keys.size, 2) + }) +}) + +test('blobStore.createEntriesReadStream({ live: true })', async () => { + const projectKey = randomBytes(32) + const { blobStore: bs1, coreManager: cm1 } = testenv({ projectKey }) + const { blobStore: bs2, coreManager: cm2 } = testenv({ projectKey }) + const { blobStore: bs3, coreManager: cm3 } = testenv({ projectKey }) + + const blob1 = randomBytes(TEST_BUF_SIZE) + const blob1Id = /** @type {const} */ ({ + type: 'photo', + variant: 'original', + name: 'blob1', + }) + const blob2 = randomBytes(TEST_BUF_SIZE) + const blob2Id = /** @type {const} */ ({ + type: 'photo', + variant: 'original', + name: 'blob2', + }) + const entries = [] + + // STEP 1: Write a blob to CM1 + await bs1.put(blob1Id, blob1) + // STEP 2: Replicate CM1 with CM3 + const { destroy: destroy1 } = replicate(cm1, cm3) + // STEP 3: Start live entries stream from CM3 + const entriesStream = bs3.createEntriesReadStream({ live: true }) + entriesStream.on('data', (entry) => entries.push(entry)) + // STEP 4: Wait for replication + await delay(200) + assert.equal(entries.length, 1, 'entry from replicated blobStore') + // STEP 5: Replicate CM2 with CM3 + const { destroy: destroy2 } = replicate(cm2, cm3) + // STEP 6: Write a blob to CM2 + await bs2.put(blob2Id, blob2) + // STEP 7: Wait for replication + await delay(200) + // STEP 8: destroy all the replication streams + await Promise.all([destroy1(), destroy2()]) + + assert.equal(entries.length, 2, '2 entries from replicated blobStore') +}) + +/** @returns {import('../../src/types.js').BlobId} */ +function randomBlobId() { + const types = /** @type {import('../../src/types.js').BlobType[]} */ ( + Object.keys(SUPPORTED_BLOB_VARIANTS) + ) + const type = types[Math.floor(Math.random() * types.length)] + const variant = + SUPPORTED_BLOB_VARIANTS[type][ + Math.floor(Math.random() * SUPPORTED_BLOB_VARIANTS[type].length) + ] + // @ts-expect-error + return { type, variant, name: randomBytes(8).toString('hex') } +} + +/** @param {import('../../src/types.js').BlobId} blobId */ +function blobIdToKey({ name, type, variant }) { + return `/${type}/${variant}/${name}` } /** - * Resolve when liveDownload status is 'downloaded' - * - * @param {ReturnType} liveDownload - * @returns {Promise} + * @param {Parameters[0] & { downloadFilter?: ConstructorParameters[0]['downloadFilter'] }} opts */ -async function downloaded(liveDownload) { - return new Promise((res) => { - liveDownload.on('state', function onState(state) { - // If liveDownload is created before all cores have been added to the - // replication stream, then initially it will emit `downloaded` (since it - // has downloaded the zero data there is available to download), so we - // also wait for the `downloaded` once data has actually downloaded - if (state.status !== 'downloaded' || state.haveCount === 0) return - liveDownload.off('state', onState) - res() - }) - }) +function testenv({ downloadFilter = null, ...coreManagerOpts } = {}) { + const coreManager = createCoreManager(coreManagerOpts) + const blobStore = new BlobStore({ coreManager, downloadFilter }) + return { blobStore, coreManager } } diff --git a/test/blob-store/combine-states.js b/test/blob-store/combine-states.js deleted file mode 100644 index 119d4c82f..000000000 --- a/test/blob-store/combine-states.js +++ /dev/null @@ -1,149 +0,0 @@ -import { combineStates } from '../../src/blob-store/live-download.js' -import test from 'node:test' -import assert from 'node:assert/strict' - -const partial = { - haveCount: 0, - haveBytes: 0, - wantCount: 0, - wantBytes: 0, - error: null, -} - -const fixtures = /** @type {const} */ ([ - { - statuses: ['checking', 'downloading', 'downloaded'], - expected: 'checking', - }, - { - statuses: ['checking', 'downloading', 'downloading'], - expected: 'checking', - }, - { - statuses: ['downloading', 'downloading', 'downloaded'], - expected: 'downloading', - }, - { - statuses: ['downloaded', 'downloaded', 'downloaded'], - expected: 'downloaded', - }, - { - statuses: ['checking', 'checking', 'checking'], - expected: 'checking', - }, -]) - -test('expected combined state, no error or abort', () => { - for (const { statuses, expected } of fixtures) { - const inputs = statuses.map((status) => ({ state: { ...partial, status } })) - const expectedState = { ...partial, status: expected } - for (const permuted of permute(inputs)) { - assert.deepEqual(combineStates(permuted), expectedState) - } - } -}) - -test('expected combined state, with error', () => { - for (const { statuses } of fixtures) { - const inputs = [ - ...statuses.map((status) => ({ state: { ...partial, status } })), - { - state: { - ...partial, - error: new Error(), - status: /** @type {const} */ ('error'), - }, - }, - ] - const expectedState = { ...partial, error: new Error(), status: 'error' } - for (const permuted of permute(inputs)) { - assert.deepEqual(combineStates(permuted), expectedState) - } - } -}) - -test('expected combined state, with abort', () => { - const controller = new AbortController() - controller.abort() - const { signal } = controller - for (const { statuses } of fixtures) { - const inputs = statuses.map((status) => ({ state: { ...partial, status } })) - const expectedState = { ...partial, status: 'aborted' } - for (const permuted of permute(inputs)) { - assert.deepEqual(combineStates(permuted, { signal }), expectedState) - } - } -}) - -test('arithmetic test', () => { - const counts = [ - [1, 2, 3, 4], - [1, 2, 3, 4], - [1, 2, 3, 4], - ] - const expected = { - haveCount: 3, - haveBytes: 6, - wantCount: 9, - wantBytes: 12, - error: null, - status: 'downloaded', - } - const inputs = counts.map(([haveCount, haveBytes, wantCount, wantBytes]) => { - return { - state: { - haveCount, - haveBytes, - wantCount, - wantBytes, - error: null, - status: /** @type {const} */ ('downloaded'), - }, - } - }) - assert.deepEqual(combineStates(inputs), expected) -}) - -/** - * Returns an iterator of all permutations of the given array. - * - * Implements [Heap's algorithm][0]. - * - * [0]: https://en.wikipedia.org/wiki/Heap%27s_algorithm - * - * @template T - * @param {ReadonlyArray} arr - * @returns {IterableIterator>} - */ -function* permute(arr) { - const c = Array(arr.length).fill(0) - - yield arr - - let i = 1 - while (i < arr.length) { - if (c[i] < i) { - arr = swapping(arr, i % 2 ? c[i] : 0, i) - yield arr - c[i] += 1 - i = 1 - } else { - c[i] = 0 - i += 1 - } - } -} - -/** - * @template T - * @param {ReadonlyArray} arr - * @param {number} index1 - * @param {number} index2 - * @returns {ReadonlyArray} - */ -function swapping(arr, index1, index2) { - const result = arr.slice() - result[index1] = arr[index2] - result[index2] = arr[index1] - return result -} diff --git a/test/blob-store/live-download.js b/test/blob-store/live-download.js deleted file mode 100644 index 293985dfc..000000000 --- a/test/blob-store/live-download.js +++ /dev/null @@ -1,358 +0,0 @@ -import test from 'node:test' -import assert from 'node:assert/strict' -import { DriveLiveDownload } from '../../src/blob-store/live-download.js' -import Hyperdrive from 'hyperdrive' -import Corestore from 'corestore' -import RAM from 'random-access-memory' -import { setTimeout } from 'node:timers/promises' -import { once } from 'node:events' -import { randomBytes } from 'node:crypto' -/** - * @import { - * BlobDownloadState, - * BlobDownloadStateError - * } from '../../src/blob-store/live-download.js' - */ - -// Test with buffers that are 3 times the default blockSize for hyperblobs -const TEST_BUF_SIZE = 3 * 64 * 1024 - -test('live download', async () => { - const { drive1, drive2, replicate } = await testEnv() - - await drive1.put('/foo', randomBytes(TEST_BUF_SIZE)) - const drive1Entry = await drive1.entry('/foo') - assert(drive1Entry) - const { - value: { blob: blob1 }, - } = drive1Entry - - const stream = replicate() - const blobCore2 = (await drive2.getBlobs())?.core - assert(blobCore2) - - const download = new DriveLiveDownload(drive2) - await waitForState(download, 'downloaded') - // Can't use `drive2.get()` here because connected to replication stream, so - // it would download anyway (no `waitFor = false` support for Hyperdrive yet) - assert( - await blobCore2.has( - blob1.blockOffset, - blob1.blockOffset + blob1.blockLength - ), - 'First blob is downloaded' - ) - assert(blob1.blockLength > 1, 'Blob is more than one block length') - - const expected = randomBytes(TEST_BUF_SIZE) - await drive1.put('/bar', expected) - - await waitForState(download, 'downloaded') - stream.destroy() - await once(stream, 'close') - - assert.deepEqual( - await drive2.get('/bar'), - expected, - 'Second blob is downloaded' - ) -}) - -test('sparse live download', async () => { - const { drive1, drive2, replicate } = await testEnv() - - const buf1 = randomBytes(TEST_BUF_SIZE) - const buf2 = randomBytes(TEST_BUF_SIZE) - - await drive1.put('photo/original/one', buf1) - await drive1.put('video/original/one', randomBytes(TEST_BUF_SIZE)) - - const stream = replicate() - - const download = new DriveLiveDownload(drive2, { - filter: { photo: ['original'] }, - }) - await waitForState(download, 'downloaded') - - await drive1.put('photo/original/two', buf2) - await drive1.put('video/original/two', randomBytes(TEST_BUF_SIZE)) - await waitForState(download, 'downloaded') - - stream.destroy() - await once(stream, 'close') - - assert.deepEqual(await drive2.get('photo/original/one'), buf1) - assert.deepEqual(await drive2.get('photo/original/two'), buf2) - - await assert.rejects( - drive2.get('video/original/one', { wait: false }), - { - message: /BLOCK_NOT_AVAILABLE/, - }, - 'Block not available' - ) - await assert.rejects( - drive2.get('video/original/two', { wait: false }), - { - message: /BLOCK_NOT_AVAILABLE/, - }, - 'Block not available' - ) -}) - -test('Abort download (same tick)', async () => { - const { drive1, drive2, replicate } = await testEnv() - await drive1.put('/foo', randomBytes(TEST_BUF_SIZE)) - const stream = replicate() - const controller = new AbortController() - const download = new DriveLiveDownload(drive2, { signal: controller.signal }) - controller.abort() - stream.destroy() - await once(stream, 'close') - assert.deepEqual(download.state, { - haveCount: 0, - haveBytes: 0, - wantCount: 0, - wantBytes: 0, - error: null, - status: 'aborted', - }) - assert.equal(await drive2.get('/foo'), null, 'nothing downloaded') -}) - -test('Abort download (next event loop)', async () => { - const { drive1, drive2, replicate } = await testEnv() - await drive1.put('/one', randomBytes(TEST_BUF_SIZE)) - const stream = replicate() - const controller = new AbortController() - const download = new DriveLiveDownload(drive2, { signal: controller.signal }) - // This is the only way to trigger abort before the entryStream loop - await drive2.getBlobs() - controller.abort() - stream.destroy() - await once(stream, 'close') - assert.deepEqual(download.state, { - haveCount: 0, - haveBytes: 0, - wantCount: 0, - wantBytes: 0, - error: null, - status: 'aborted', - }) - await assert.rejects( - drive2.get('/foo', { wait: false }), - { - message: /Block not available locally/, - }, - 'Block not available locally' - ) -}) - -test('Abort download (after initial download)', async () => { - const { drive1, drive2, replicate } = await testEnv() - - const buf1 = randomBytes(TEST_BUF_SIZE) - await drive1.put('/one', buf1) - - const stream = replicate() - const controller = new AbortController() - const download = new DriveLiveDownload(drive2, { signal: controller.signal }) - await waitForState(download, 'downloaded') - - controller.abort() - - await drive1.put('/two', randomBytes(TEST_BUF_SIZE)) - - // Nothing should happen here, but allow some time to see if it does - await setTimeout(200) - - stream.destroy() - await once(stream, 'close') - - assert.deepEqual(await drive2.get('/one'), buf1, 'First blob is downloaded') - await assert.rejects( - drive2.get('/two', { wait: false }), - { - message: /BLOCK_NOT_AVAILABLE/, - }, - 'Second blob is not downloaded' - ) -}) - -test('Live download when data is already downloaded', async () => { - const { drive1, drive2, replicate } = await testEnv() - - const buf1 = randomBytes(20) - await drive1.put('/one', buf1) - - const stream1 = replicate() - - await drive2.db.core.update({ wait: true }) - await drive2.download() - assert.deepEqual(await drive2.get('/one'), buf1, 'First blob is downloaded') - - stream1.destroy() - await once(stream1, 'close') - - const stream2 = replicate() - const download = new DriveLiveDownload(drive2) - await waitForState(download, 'downloaded') - assert.deepEqual( - download.state, - { - haveCount: 1, - haveBytes: buf1.byteLength, - wantCount: 0, - wantBytes: 0, - error: null, - status: 'downloaded', - }, - 'Blob already downloaded is included in state' - ) - - const buf2 = randomBytes(TEST_BUF_SIZE) - await drive1.put('/two', buf2) - await waitForState(download, 'downloaded') - - stream2.destroy() - await once(stream2, 'close') - - assert.deepEqual(await drive2.get('/two'), buf2, 'Second blob is downloaded') -}) - -test('Live download continues across disconnection and reconnect', async () => { - const { drive1, drive2, replicate } = await testEnv() - - const buf1 = randomBytes(TEST_BUF_SIZE) - await drive1.put('/one', buf1) - - const stream1 = replicate() - - const download = new DriveLiveDownload(drive2) - await waitForState(download, 'downloaded') - - assert.deepEqual(await drive2.get('/one'), buf1, 'First blob is downloaded') - - stream1.destroy() - await once(stream1, 'close') - - const buf2 = randomBytes(TEST_BUF_SIZE) - await drive1.put('/two', buf2) - - const stream2 = replicate() - await waitForState(download, 'downloaded') - - stream2.destroy() - await once(stream2, 'close') - - assert.deepEqual(await drive2.get('/two'), buf2, 'Second blob is downloaded') -}) - -test('Initial status', async () => { - const { drive1 } = await testEnv() - - const download = new DriveLiveDownload(drive1) - assert.equal( - download.state.status, - 'checking', - "initial status is 'checking'" - ) -}) - -test('Unitialized drive with no data', async () => { - // This test is important because it catches an edge case where a drive might - // have been added by its key, but has never replicated, so it has no data so - // the content feed will never be read from the header, which might result in - // it forever being in the 'checking' status. This tests that we catch this - // and resolve status to 'downloaded'. - const { drive2 } = await testEnv() - const download = new DriveLiveDownload(drive2) - await waitForState(download, 'downloaded') - assert.equal( - download.state.status, - 'downloaded', - 'uninitialized drive without peers results in `downloaded` state' - ) -}) - -test('live download started before initial replication', async () => { - const { drive1, drive2, replicate } = await testEnv() - - await drive1.put('/foo', randomBytes(TEST_BUF_SIZE)) - const drive1Entry = await drive1.entry('/foo') - assert(drive1Entry) - const { - value: { blob: blob1 }, - } = drive1Entry - - const download = new DriveLiveDownload(drive2) - await waitForState(download, 'downloaded') - // initially drive2 is not replicating and empty, so we expect a 'downloaded' status - assert.equal(download.state.status, 'downloaded') - - const stream = replicate() - const blobCore2 = (await drive2.getBlobs())?.core - assert(blobCore2) - await waitForState(download, 'downloaded') - - // Can't use `drive2.get()` here because connected to replication stream, so - // it would download anyway (no `waitFor = false` support for Hyperdrive yet) - assert( - await blobCore2.has( - blob1.blockOffset, - blob1.blockOffset + blob1.blockLength - ), - 'First blob is downloaded' - ) - assert(blob1.blockLength > 1, 'Blob is more than one block length') - - const expected = randomBytes(TEST_BUF_SIZE) - await drive1.put('/bar', expected) - - await waitForState(download, 'downloaded') - stream.destroy() - await once(stream, 'close') - - assert.deepEqual( - await drive2.get('/bar'), - expected, - 'Second blob is downloaded' - ) -}) - -/** - * @param {DriveLiveDownload} download - * @param {(BlobDownloadState | BlobDownloadStateError)['status']} status - * @returns {Promise} - */ -async function waitForState(download, status) { - return new Promise((res) => { - download.on('state', function onState(state) { - // console.log('download state', state) - if (state.status !== status) return - download.off('state', onState) - res() - }) - }) -} - -async function testEnv() { - const store1 = new Corestore(() => new RAM()) - const store2 = new Corestore(() => new RAM()) - const drive1 = new Hyperdrive(store1) - await drive1.ready() - const drive2 = new Hyperdrive(store2, drive1.key) - await drive2.ready() - - function replicate() { - const s = store1.replicate(true) - s.pipe(store2.replicate(false)).pipe(s) - return s - } - - return { - drive1, - drive2, - replicate, - } -} diff --git a/test/blob-store/utils.js b/test/blob-store/utils.js new file mode 100644 index 000000000..08516fd98 --- /dev/null +++ b/test/blob-store/utils.js @@ -0,0 +1,50 @@ +import test from 'node:test' +import assert from 'node:assert/strict' +import { filePathMatchesFilter } from '../../src/blob-store/utils.js' + +test('filePathMatchesFilter', () => { + const filter = { + photo: ['a', 'b'], + video: [], + } + + const shouldMatch = [ + '/photo/a/foo.jpg', + '/photo/b/foo.jpg', + '/photo/a/', + '/video/foo.mp4', + '/video/foo/bar.mp4', + '/video/', + '/video///', + ] + for (const filePath of shouldMatch) { + assert( + filePathMatchesFilter(filter, filePath), + `${filePath} matches filter` + ) + } + + const shouldntMatch = [ + '/photo/c/foo.jpg', + '/photo/c/', + '/photo/a', + '/photo/ax/foo.jpg', + '/photo/c/../a/foo.jpg', + '/photo', + '/photo/', + '/photo//', + '/PHOTO/a/foo.jpg', + '/audio/a/foo.mp3', + 'photo/a/foo.jpg', + '//photo/a/foo.jpg', + ' /photo/a/foo.jpg', + '/hasOwnProperty/', + '/hasOwnProperty/a/foo.jpg', + ] + for (const filePath of shouldntMatch) { + assert( + !filePathMatchesFilter(filter, filePath), + `${filePath} doesn't match filter` + ) + } +}) diff --git a/test/lib/error.js b/test/lib/error.js index c7de6ea98..56f9ccc6c 100644 --- a/test/lib/error.js +++ b/test/lib/error.js @@ -1,6 +1,10 @@ import assert from 'node:assert/strict' import test, { describe } from 'node:test' -import { ErrorWithCode, getErrorMessage } from '../../src/lib/error.js' +import { + ErrorWithCode, + getErrorCode, + getErrorMessage, +} from '../../src/lib/error.js' describe('ErrorWithCode', () => { test('ErrorWithCode with two arguments', () => { @@ -22,6 +26,44 @@ describe('ErrorWithCode', () => { }) }) +describe('getErrorCode', () => { + test('from values without a string code', () => { + class ErrorWithNumericCode extends Error { + code = 123 + } + + const testCases = [ + undefined, + null, + 'ignored', + { code: 'ignored' }, + new Error('has no code'), + new ErrorWithNumericCode(), + ] + + for (const testCase of testCases) { + assert.equal(getErrorCode(testCase), undefined) + } + }) + + test('from Errors with a string code', () => { + class ErrorWithInheritedCode extends Error { + get code() { + return 'foo' + } + } + + const testCases = [ + new ErrorWithCode('foo', 'message'), + new ErrorWithInheritedCode(), + ] + + for (const testCase of testCases) { + assert.equal(getErrorCode(testCase), 'foo') + } + }) +}) + describe('getErrorMessage', () => { test('from objects without a string message', () => { const testCases = [ diff --git a/test/sync/core-sync-state.js b/test/sync/core-sync-state.js index 3f3a45de0..4a7b13d3e 100644 --- a/test/sync/core-sync-state.js +++ b/test/sync/core-sync-state.js @@ -408,7 +408,7 @@ function createState({ have, prehave, want, status }) { // 53 because the max safe integer in JS is 53 bits for (let i = 0; i < 53; i++) { if ((bigInt >> BigInt(i)) & BigInt(1)) { - peerState.setWantRange({ start: i, length: 1 }) + peerState.addWantRange(i, 1) } } } @@ -480,16 +480,14 @@ async function downloadCore(core, bits) { function setPeerWants(state, peerId, bits) { if (typeof bits === 'undefined') return if (bits > Number.MAX_SAFE_INTEGER) throw new Error() + state.clearWantRanges(peerId) const bigInt = BigInt(bits) - /** @type {{ start: number, length: number}[]} */ - const ranges = [] // 53 because the max safe integer in JS is 53 bits for (let i = 0; i < 53; i++) { if ((bigInt >> BigInt(i)) & BigInt(1)) { - ranges.push({ start: i, length: 1 }) + state.addWantRange(peerId, i, 1) } } - state.setPeerWants(peerId, ranges) } /** diff --git a/types/hyperbee.d.ts b/types/hyperbee.d.ts new file mode 100644 index 000000000..9aa4054bb --- /dev/null +++ b/types/hyperbee.d.ts @@ -0,0 +1,168 @@ +declare module 'hyperbee' { + import type { TypedEmitter } from 'tiny-typed-emitter' + import Hypercore from 'hypercore' + import { EventEmitter } from 'events' + import { Readable } from 'streamx' + + type Encoding = 'binary' | 'utf-8' | 'ascii' | 'json' | AbstractEncoding + + declare namespace Hyperbee { + interface HyperbeeOptions { + keyEncoding?: Encoding + valueEncoding?: Encoding + } + + interface HyperbeeEntry { + seq: number + key: string + value: T + } + + interface PutOptions { + cas?: ( + prev: HyperbeeEntry, + next: HyperbeeEntry + ) => boolean | PromiseLike + } + + interface DelOptions { + cas?: (prev: T) => boolean | PromiseLike + } + + interface ReadStreamRange { + gt?: string + gte?: string + lt?: string + lte?: string + } + + interface ReadStreamOptions { + reverse?: boolean + limit?: number + } + + interface HistoryStreamOptions extends ReadStreamOptions { + live?: boolean + reverse?: boolean + gte?: number + gt?: number + lte?: number + lt?: number + // These options missing from the docs + keyEncoding?: Encoding + valueEncoding?: Encoding + encoding?: Encoding + } + + interface DiffStreamEntry { + left: T + right: T + } + + interface DiffStreamOptions extends ReadStreamOptions {} + + interface GetAndWatchOptions { + keyEncoding?: 'binary' | 'utf-8' | 'ascii' | 'json' | AbstractEncoding + valueEncoding?: 'binary' | 'utf-8' | 'ascii' | 'json' | AbstractEncoding + } + + interface SubDatabaseOptions extends HyperbeeOptions { + sep?: Buffer + } + + interface HeaderOptions {} + } + + class Hyperbee { + constructor(core: Hypercore, options?: Hyperbee.HyperbeeOptions) + + ready(): Promise + close(): Promise + + readonly core: Hypercore + readonly version: number + // Below are not yet implemented on the version of hyperbee we're using + // readonly id: string + // readonly key: null | Buffer + // readonly discoveryKey: null | Buffer + // readonly writable: boolean + // readonly readable: boolean + // getBySeq( + // seq: number, + // options?: any + // ): Promise, 'seq'> | null> + + put( + key: string, + value?: any, + options?: Hyperbee.PutOptions + ): Promise + del(key: string, options?: Hyperbee.DelOptions): Promise + get(key: string): Promise | null> + + batch(): HyperbeeBatch + replicate(isInitiatorOrStream: any): Readable + createReadStream( + range?: Hyperbee.ReadStreamRange, + options?: Hyperbee.ReadStreamOptions + ): Readable> + peek( + range?: Hyperbee.ReadStreamRange, + options?: Hyperbee.ReadStreamOptions + ): Promise | null> + createHistoryStream(options?: Hyperbee.HistoryStreamOptions): Readable< + Hyperbee.HyperbeeEntry & { + type: 'put' | 'del' + } + > + createDiffStream( + otherVersion: number, + options?: Hyperbee.DiffStreamOptions + ): Readable> + + getAndWatch( + key: string, + options?: Hyperbee.GetAndWatchOptions + ): Promise> + watch( + range?: Hyperbee.ReadStreamRange + ): AsyncIterable<[any, any]> & { close: () => Promise } + + checkout(version: number): Hyperbee + snapshot(): Hyperbee + + sub(prefix: string, options?: Hyperbee.SubDatabaseOptions): Hyperbee + getHeader(options?: any): Promise + + static isHyperbee(core: Hypercore, options?: any): Promise + } + + class HyperbeeBatch { + put(key: string, value?: T, options?: PutOptions): Promise + get(key: string): Promise | null> + del(key: string, options?: DelOptions): Promise + flush(): Promise + close(): Promise + } + + class EntryWatcher extends TypedEmitter<{ + update: () => void + }> { + node: { seq: number; key: string; value: T } + + close(): Promise + } + + interface AbstractEncoding { + encode: (data: T) => Buffer + encode: (data: T, buffer: Buffer) => Buffer + encode: (data: T, buffer: Buffer, offset: number) => Buffer + encode: (data: T, buffer?: Buffer, offset: number) => Buffer + decode: (buffer: Buffer) => T + decode: (buffer: Buffer, offset: number) => T + decode: (buffer: Buffer, offset: number, end: number) => T + decode: (buffer: Buffer, offset?: number, end: number) => T + } + + export = Hyperbee +} diff --git a/types/hyperdrive.d.ts b/types/hyperdrive.d.ts index 3dd708a15..a795cb05e 100644 --- a/types/hyperdrive.d.ts +++ b/types/hyperdrive.d.ts @@ -2,6 +2,7 @@ declare module 'hyperdrive' { import Corestore from 'corestore' import Hypercore from 'hypercore' import Hyperblobs, { BlobId } from 'hyperblobs' + import Hyperbee from 'hyperbee' import { Readable, Writable } from 'streamx' import { TypedEmitter } from 'tiny-typed-emitter' import { JsonValue } from 'type-fest' @@ -33,16 +34,14 @@ declare module 'hyperdrive' { } namespace Hyperdrive { - export interface HyperdriveEntry { - seq: number - key: string - value: { - executable: boolean // whether the blob at path is an executable - linkname: null | string // if entry not symlink, otherwise a string to the entry this links to - blob: BlobId // a Hyperblob id that can be used to fetch the blob associated with this entry - metadata: JsonValue - } + interface HyperdriveEntryValue { + executable: boolean // whether the blob at path is an executable + linkname: null | string // if entry not symlink, otherwise a string to the entry this links to + blob: BlobId // a Hyperblob id that can be used to fetch the blob associated with this entry + metadata: JsonValue } + export interface HyperdriveEntry + extends Hyperbee.HyperbeeEntry {} } class Hyperdrive extends TypedEmitter { @@ -58,7 +57,7 @@ declare module 'hyperdrive' { readonly key: Buffer | null readonly discoveryKey: Buffer | null readonly contentKey: Buffer | null // The public key of the Hyperblobs instance holding blobs associated with entries in the drive. - readonly db: any // Hyperbee + readonly db: Hyperbee readonly version: number ready(): Promise update(options?: { wait?: boolean }): Promise @@ -70,7 +69,7 @@ declare module 'hyperdrive' { path: string, opts?: HyperdriveGetOpts ): Promise - getBlobs(): Promise + getBlobs(): Promise get( path: string, opts?: { follow?: boolean } & HyperdriveGetOpts @@ -102,6 +101,7 @@ declare module 'hyperdrive' { path: string, opts?: { diff?: boolean } ): Promise<{ blocks: number } | null> + close(): Promise } export = Hyperdrive diff --git a/types/unix-path-resolve.d.ts b/types/unix-path-resolve.d.ts new file mode 100644 index 000000000..11eef5474 --- /dev/null +++ b/types/unix-path-resolve.d.ts @@ -0,0 +1,4 @@ +declare module 'unix-path-resolve' { + function resolve(path: string, path: string): string + export = resolve +}