Skip to content

Commit

Permalink
WIP: IterableWeakMap for referencing live external objects
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaclennan committed Oct 25, 2024
1 parent 30a552e commit 04da19f
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 55 deletions.
22 changes: 19 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@
"undici": "^6.13.0",
"unix-path-resolve": "^1.0.2",
"varint": "^6.0.0",
"weakref": "^0.2.1",
"yauzl-promise": "^4.0.0"
}
}
57 changes: 28 additions & 29 deletions src/blob-store/downloader.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ import { noop } from '../utils.js'
* @property {(state: BlobDownloadState | BlobDownloadStateError ) => void} state Emitted with the current download state whenever it changes (not emitted during initial 'checking' status)
*/

const kAddDrive = Symbol('addDrive to downloader')

/**
* @param {Downloader} downloader
* @param {Hyperdrive} drive
*/
export function addDriveToDownloader(downloader, drive) {
downloader[kAddDrive](drive)
}

class State {
haveCount = 0
haveBytes = 0
Expand Down Expand Up @@ -72,51 +82,24 @@ export class Downloader extends TypedEmitter {
#ac = new AbortController()
#state

/** @param {import('hyperdrive')} drive */
#addDrive = (drive) => {
if (drive.key) {
this.#drivesById.set(drive.key.toString('hex'), drive)
return
}
drive
.ready()
.then(() => {
if (!drive.key) return // should never happen
this.#drivesById.set(drive.key.toString('hex'), drive)
})
.catch(noop)
}

/**
* Like drive.download() but 'live', and for multiple drives
* @param {Array<import('hyperdrive')>} drives
* @param {import('./index.js').InternalDriveEmitter} driveEmitter
* @param {object} [options]
* @param {import('../types.js').BlobFilter} [options.filter] Filter blobs of specific types and/or sizes to download
* @param {boolean} [options.live=false]
*/
constructor(drives, driveEmitter, { filter, live = false } = {}) {
constructor(drives, { filter, live = false } = {}) {
super()
this.#state = new State({ live })

this.#entriesStream = createEntriesStream(drives, driveEmitter, {
this.#entriesStream = createEntriesStream(drives, {
live,
folders: filterToFolders(filter),
})

this.#donePromise = this.#start()
this.#donePromise.catch(noop)

if (!live) return

driveEmitter.on('add-drive', this.#addDrive)
this.#ac.signal.addEventListener(
'abort',
() => {
driveEmitter.off('add-drive', this.#addDrive)
},
{ once: true }
)
}

async #start() {
Expand Down Expand Up @@ -171,6 +154,22 @@ export class Downloader extends TypedEmitter {
}
}

/** @param {import('hyperdrive')} drive */
[kAddDrive](drive) {
if (this.#ac.signal.aborted) return
if (drive.key) {
this.#drivesById.set(drive.key.toString('hex'), drive)
return
}
drive
.ready()
.then(() => {
if (!drive.key) return // should never happen
this.#drivesById.set(drive.key.toString('hex'), drive)
})
.catch(noop)
}

done() {
return this.#donePromise
}
Expand Down
24 changes: 15 additions & 9 deletions src/blob-store/entries-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,38 @@ import unixPathResolve from 'unix-path-resolve'
/** @import { BlobStoreEntriesStream } from '../types.js' */

const keyEncoding = new SubEncoder('files', 'utf-8')
const kAddDrive = Symbol('addDrive to entries stream')

/**
* @param {BlobStoreEntriesStream} entriesStream
* @param {Hyperdrive} drive
*/
export function addDriveToEntriesStream(entriesStream, drive) {
// @ts-expect-error - We don't expose this method in the type
entriesStream[kAddDrive](drive)
}

/**
*
* @param {Array<Hyperdrive>} drives
* @param {import('./index.js').InternalDriveEmitter} driveEmitter
* @param {object} opts
* @param {boolean} [opts.live=false]
* @param {readonly string[]} [opts.folders]
* @returns {BlobStoreEntriesStream}
*/
export function createEntriesStream(
drives,
driveEmitter,
{ live = false, folders = ['/'] } = {}
) {
folders = normalizeFolders(folders)
const mergedEntriesStreams = mergeStreams(
drives.map((drive) => getFilteredHistoryStream(drive.db, { folders, live }))
)
if (live) {
driveEmitter.on('add-drive', addDrive)
mergedEntriesStreams.on('close', () => {
driveEmitter.off('add-drive', addDrive)
})
}
// @ts-expect-error
Object.defineProperty(mergedEntriesStreams, kAddDrive, {
value: addDrive,
writable: false,
enumerable: false,
})
return mergedEntriesStreams

/** @param {Hyperdrive} drive */
Expand Down
46 changes: 32 additions & 14 deletions src/blob-store/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,24 @@ import Hyperdrive from 'hyperdrive'
import b4a from 'b4a'
import util from 'node:util'
import { discoveryKey } from 'hypercore-crypto'
import { TypedEmitter } from 'tiny-typed-emitter'
import { Downloader } from './downloader.js'
import { createEntriesStream } from './entries-stream.js'
import { addDriveToDownloader, Downloader } from './downloader.js'
import {
addDriveToEntriesStream,
createEntriesStream,
} from './entries-stream.js'
import { IterableWeakSet } from 'weakref'

/** @import { JsonObject } from 'type-fest' */
/** @import { Readable as NodeReadable } from 'node:stream' */
/** @import { Readable as StreamxReadable, Writable } from 'streamx' */
/** @import { BlobFilter, BlobId } from '../types.js' */
/** @import { BlobFilter, BlobId, BlobStoreEntriesStream } from '../types.js' */
/** @import { BlobDownloadEvents } from './downloader.js' */

/**
* @internal
* @typedef {NodeReadable | StreamxReadable} Readable
*/

/** @typedef {TypedEmitter<{ 'add-drive': (drive: import('hyperdrive')) => void }>} InternalDriveEmitter */

// prop = blob type name
// value = array of blob variants supported for that type
const SUPPORTED_BLOB_VARIANTS = /** @type {const} */ ({
Expand All @@ -42,11 +44,10 @@ export class BlobStore {
/** @type {Map<string, Hyperdrive>} Indexed by hex-encoded discovery key */
#hyperdrives = new Map()
#writer
/**
* Used to communicate to live download instances when new drives are added
* @type {InternalDriveEmitter}
*/
#driveEmitter = new TypedEmitter()
/** @type {IterableWeakSet<Downloader>} */
#liveDownloaders = new IterableWeakSet()
/** @type {IterableWeakSet<BlobStoreEntriesStream} */

Check failure on line 49 in src/blob-store/index.js

View workflow job for this annotation

GitHub Actions / build (macos-latest, 18.x)

'>' expected.

Check failure on line 49 in src/blob-store/index.js

View workflow job for this annotation

GitHub Actions / build (macos-latest, 20.x)

'>' expected.

Check failure on line 49 in src/blob-store/index.js

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 18.x)

'>' expected.

Check failure on line 49 in src/blob-store/index.js

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 20.x)

'>' expected.

Check failure on line 49 in src/blob-store/index.js

View workflow job for this annotation

GitHub Actions / build (windows-latest, 18.x)

'>' expected.

Check failure on line 49 in src/blob-store/index.js

View workflow job for this annotation

GitHub Actions / build (windows-latest, 20.x)

'>' expected.
#liveEntriesStreams = new IterableWeakSet()

/**
* @param {object} options
Expand Down Expand Up @@ -80,7 +81,17 @@ export class BlobStore {
// @ts-ignore - we know pretendCorestore is not actually a Corestore
const drive = new Hyperdrive(corestore, key)
this.#hyperdrives.set(driveId, drive)
this.#driveEmitter.emit('add-drive', drive)
for (const downloader of this.#liveDownloaders) {
addDriveToDownloader(downloader, drive)
}
for (const entriesStream of this.#liveEntriesStreams) {
try {
addDriveToEntriesStream(entriesStream, drive)
} catch {
// This happens when the stream is already closed, so we can remove our reference.
this.#liveEntriesStreams.delete(entriesStream)
}
}
})
}

Expand Down Expand Up @@ -131,10 +142,14 @@ export class BlobStore {
*/
download({ filter, live = false } = {}) {
const drives = Array.from(this.#hyperdrives.values())
return new Downloader(drives, this.#driveEmitter, {
const downloader = new Downloader(drives, {
filter,
live,
})
if (live) {
this.#liveDownloaders.add(downloader)
}
return downloader
}

/**
Expand Down Expand Up @@ -167,10 +182,13 @@ export class BlobStore {
*/
createEntriesReadStream({ live = false, folders } = {}) {
const drives = Array.from(this.#hyperdrives.values())
const entriesStream = createEntriesStream(drives, this.#driveEmitter, {
const entriesStream = createEntriesStream(drives, {
live,
folders,
})
if (live) {
this.#liveEntriesStreams.add(entriesStream)
}
return entriesStream
}

Expand Down
1 change: 1 addition & 0 deletions types/hyperdrive.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ declare module 'hyperdrive' {
path: string,
opts?: { diff?: boolean }
): Promise<{ blocks: number } | null>
close(): Promise<void>
}

export = Hyperdrive
Expand Down

0 comments on commit 04da19f

Please sign in to comment.