Skip to content

Commit

Permalink
WIP more cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaclennan committed Oct 28, 2024
1 parent 9f12572 commit 7e6fa62
Showing 1 changed file with 30 additions and 11 deletions.
41 changes: 30 additions & 11 deletions src/blob-store/downloader.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,19 @@ class State {
}
}

// This class contains a large amount of parallel async code, and contains lots
// of references and some listeners that need to be deferenced when this class
// is finished with (e.g when a download is complete, or there is an error).
// I've highlighted lines which could throw an error which would put the
// downloader in an "error" state. Currently this does not emit an "error"
// event, but we may want to add one for _live_ downloaders in the future.
// Non-live downloaders can return error state in `done()`.

/**
* Like hyperdrive.download() but optionally 'live', and for multiple drives.
* Emits `state` events with the current download state.
* Emits `state` events with the current download state. A 'live' downloader
* must be `destroy()`ed to clean up resources and avoid memory leaks. A
* non-live downloader will clean up after itself when it is done.
*
* NB: unlike hyperdrive.download(), this will also download deleted and
* previous versions of blobs - we don't currently support editing or deleting
Expand All @@ -114,7 +124,6 @@ export class Downloader extends TypedEmitter {
#processEntriesPromise
#ac = new AbortController()
#state
#live
#pathPrefixes

/**
Expand All @@ -125,7 +134,6 @@ export class Downloader extends TypedEmitter {
*/
constructor(drives, { filter, live = false } = {}) {
super()
this.#live = live
this.#state = new State({ live })
this.#pathPrefixes = filter ? pathPrefixesFromFilters(filter) : []

Expand All @@ -142,12 +150,12 @@ export class Downloader extends TypedEmitter {
* Start processing entries from the entries stream - if an entry matches the
* filter, and we don't already have it, queue it for download. If the
* Downloader is live, this method will never resolve, otherwise it will
* resolve when all the entries have been processed, but not necessarily
* downloaded.
* resolve when all the entries have been processed and downloaded.
*
* @param {Hyperdrive[]} drives
*/
async #processEntries(drives) {
// ERROR HANDLING: Should only throw if drive.ready() throws for any drive
await Promise.all(drives.map((drive) => this[kAddDrive](drive)))
for await (const entry of this.#entriesStream) {
this.#ac.signal.throwIfAborted()
Expand All @@ -167,14 +175,21 @@ export class Downloader extends TypedEmitter {
}
if (!this.#shouldDownloadFile(filePath)) continue
const drive = this.#drivesById.get(driveId)
// ERROR HANDLING: this is unexpected and should not happen
if (!drive) throw new Error('Drive not found: ' + driveId)
// ERROR HANDLING: this should not throw
const core = await getBlobsCore(drive, { signal: this.#ac.signal })
this.#ac.signal.throwIfAborted()
// ERROR HANDLING: this will throw if core.has() throws
await this.#processEntry(core, blob)
this.#ac.signal.throwIfAborted()
this.emit('state', this.#state.value)
// This loop will never end if live.
}
await Promise.all(
Array.from(this.#state.queuedDownloads, (download) => download.done())
)
this.#cleanup()
}

/** @param {string} filePath */
Expand Down Expand Up @@ -212,6 +227,8 @@ export class Downloader extends TypedEmitter {
this.emit('state', this.#state.value)
})
.catch((e) => {
// ERROR HANDLING: _should_ only happen if the download is destroyed
if (this.#state.error) return
this.#state.error = e
this.#ac.abort(e)
})
Expand All @@ -223,7 +240,6 @@ export class Downloader extends TypedEmitter {

/** @param {import('hyperdrive')} drive */
async [kAddDrive](drive) {
this.#ac.signal.throwIfAborted()
await drive.ready()
this.#ac.signal.throwIfAborted()
if (!drive.key) throw new Error('Unexpected: missing drive key') // should never happen
Expand All @@ -235,23 +251,26 @@ export class Downloader extends TypedEmitter {
)
}

/**
* Cancel the downloads and clean up resources.
*/
destroy() {
this.#ac.abort()
}

/**
* Will resolve when all blobs have been downloaded. Will never resolve for a
* live downloader.
*/
async done() {
if (this.#live) throw new Error('Live downloader will never be done')
await this.#processEntriesPromise
await Promise.all(
Array.from(this.#state.queuedDownloads, (download) => download.done())
)
this.#cleanup()
}

#cleanup = () => {
for (const download of this.#state.queuedDownloads) download.destroy()
this.#ac.signal.removeEventListener('abort', this.#cleanup)
this.#entriesStream.removeListener('error', this.#ac.abort)
// queuedDownloads should always be empty by here anyway, but just in case.
this.#state.queuedDownloads.clear()
this.#state.initialLengthsByDriveId.clear()
this.#drivesById.clear()
Expand Down

0 comments on commit 7e6fa62

Please sign in to comment.