diff --git a/src/blob-store/downloader.js b/src/blob-store/downloader.js index f07f0f4d..fd1ae968 100644 --- a/src/blob-store/downloader.js +++ b/src/blob-store/downloader.js @@ -96,9 +96,19 @@ class State { } } +// This class contains a large amount of parallel async code, and contains lots +// of references and some listeners that need to be deferenced when this class +// is finished with (e.g when a download is complete, or there is an error). +// I've highlighted lines which could throw an error which would put the +// downloader in an "error" state. Currently this does not emit an "error" +// event, but we may want to add one for _live_ downloaders in the future. +// Non-live downloaders can return error state in `done()`. + /** * Like hyperdrive.download() but optionally 'live', and for multiple drives. - * Emits `state` events with the current download state. + * Emits `state` events with the current download state. A 'live' downloader + * must be `destroy()`ed to clean up resources and avoid memory leaks. A + * non-live downloader will clean up after itself when it is done. * * NB: unlike hyperdrive.download(), this will also download deleted and * previous versions of blobs - we don't currently support editing or deleting @@ -114,7 +124,6 @@ export class Downloader extends TypedEmitter { #processEntriesPromise #ac = new AbortController() #state - #live #pathPrefixes /** @@ -125,7 +134,6 @@ export class Downloader extends TypedEmitter { */ constructor(drives, { filter, live = false } = {}) { super() - this.#live = live this.#state = new State({ live }) this.#pathPrefixes = filter ? pathPrefixesFromFilters(filter) : [] @@ -142,12 +150,12 @@ export class Downloader extends TypedEmitter { * Start processing entries from the entries stream - if an entry matches the * filter, and we don't already have it, queue it for download. If the * Downloader is live, this method will never resolve, otherwise it will - * resolve when all the entries have been processed, but not necessarily - * downloaded. + * resolve when all the entries have been processed and downloaded. * * @param {Hyperdrive[]} drives */ async #processEntries(drives) { + // ERROR HANDLING: Should only throw if drive.ready() throws for any drive await Promise.all(drives.map((drive) => this[kAddDrive](drive))) for await (const entry of this.#entriesStream) { this.#ac.signal.throwIfAborted() @@ -167,14 +175,21 @@ export class Downloader extends TypedEmitter { } if (!this.#shouldDownloadFile(filePath)) continue const drive = this.#drivesById.get(driveId) + // ERROR HANDLING: this is unexpected and should not happen if (!drive) throw new Error('Drive not found: ' + driveId) + // ERROR HANDLING: this should not throw const core = await getBlobsCore(drive, { signal: this.#ac.signal }) this.#ac.signal.throwIfAborted() + // ERROR HANDLING: this will throw if core.has() throws await this.#processEntry(core, blob) this.#ac.signal.throwIfAborted() this.emit('state', this.#state.value) // This loop will never end if live. } + await Promise.all( + Array.from(this.#state.queuedDownloads, (download) => download.done()) + ) + this.#cleanup() } /** @param {string} filePath */ @@ -212,6 +227,8 @@ export class Downloader extends TypedEmitter { this.emit('state', this.#state.value) }) .catch((e) => { + // ERROR HANDLING: _should_ only happen if the download is destroyed + if (this.#state.error) return this.#state.error = e this.#ac.abort(e) }) @@ -223,7 +240,6 @@ export class Downloader extends TypedEmitter { /** @param {import('hyperdrive')} drive */ async [kAddDrive](drive) { - this.#ac.signal.throwIfAborted() await drive.ready() this.#ac.signal.throwIfAborted() if (!drive.key) throw new Error('Unexpected: missing drive key') // should never happen @@ -235,23 +251,26 @@ export class Downloader extends TypedEmitter { ) } + /** + * Cancel the downloads and clean up resources. + */ destroy() { this.#ac.abort() } + /** + * Will resolve when all blobs have been downloaded. Will never resolve for a + * live downloader. + */ async done() { - if (this.#live) throw new Error('Live downloader will never be done') await this.#processEntriesPromise - await Promise.all( - Array.from(this.#state.queuedDownloads, (download) => download.done()) - ) - this.#cleanup() } #cleanup = () => { for (const download of this.#state.queuedDownloads) download.destroy() this.#ac.signal.removeEventListener('abort', this.#cleanup) this.#entriesStream.removeListener('error', this.#ac.abort) + // queuedDownloads should always be empty by here anyway, but just in case. this.#state.queuedDownloads.clear() this.#state.initialLengthsByDriveId.clear() this.#drivesById.clear()