Skip to content

Commit

Permalink
feat: handle incoming blob filters (#956)
Browse files Browse the repository at this point in the history
*You may wish to [review this squashed commit as separate commits][0].*

When you receive a blob filter from another peer, this updates their
sync states. For example, if you receive a blob filter that says "I only
want photo thumbnails", that peer's "wants" bitfield will be reduced.

Closes [#682] and [#905].

[0]: https://github.com/digidem/comapeo-core/pull/956/commits
[#682]: #682
[#905]: #905
  • Loading branch information
EvanHahn authored Nov 20, 2024
1 parent a4067fe commit 5ae541d
Show file tree
Hide file tree
Showing 15 changed files with 423 additions and 168 deletions.
6 changes: 3 additions & 3 deletions src/blob-store/downloader.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { TypedEmitter } from 'tiny-typed-emitter'
import { createEntriesStream } from './entries-stream.js'
import { filePathMatchesFilter } from './utils.js'

/** @import Hyperdrive from 'hyperdrive' */
/** @import { BlobFilter } from '../types.js' */
/** @import { THyperdriveIndex } from './hyperdrive-index.js' */

/**
* Like hyperdrive.download() but 'live', and for multiple drives.
Expand All @@ -26,7 +26,7 @@ import { filePathMatchesFilter } from './utils.js'
* @extends {TypedEmitter<{ error: (error: Error) => void }>}
*/
export class Downloader extends TypedEmitter {
/** @type {import('./index.js').THyperdriveIndex} */
/** @type {THyperdriveIndex} */
#driveIndex
/** @type {Set<{ done(): Promise<void>, destroy(): void }>} */
#queuedDownloads = new Set()
Expand All @@ -36,7 +36,7 @@ export class Downloader extends TypedEmitter {
#shouldDownloadFile

/**
* @param {import('./index.js').THyperdriveIndex} driveIndex
* @param {THyperdriveIndex} driveIndex
* @param {object} [options]
* @param {BlobFilter | null} [options.filter] Filter blobs of specific types and/or sizes to download
*/
Expand Down
3 changes: 2 additions & 1 deletion src/blob-store/entries-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import { noop } from '../utils.js'

/** @import Hyperdrive from 'hyperdrive' */
/** @import { BlobStoreEntriesStream } from '../types.js' */
/** @import { THyperdriveIndex } from './hyperdrive-index.js' */

const keyEncoding = new SubEncoder('files', 'utf-8')

/**
*
* @param {import('./index.js').THyperdriveIndex} driveIndex
* @param {THyperdriveIndex} driveIndex
* @param {object} opts
* @param {boolean} [opts.live=false]
* @returns {BlobStoreEntriesStream}
Expand Down
122 changes: 122 additions & 0 deletions src/blob-store/hyperdrive-index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import b4a from 'b4a'
import { discoveryKey } from 'hypercore-crypto'
import Hyperdrive from 'hyperdrive'
import util from 'node:util'
import { TypedEmitter } from 'tiny-typed-emitter'

/** @typedef {HyperdriveIndexImpl} THyperdriveIndex */

/**
* @extends {TypedEmitter<{ 'add-drive': (drive: Hyperdrive) => void }>}
*/
export class HyperdriveIndexImpl extends TypedEmitter {
/** @type {Map<string, Hyperdrive>} */
#hyperdrives = new Map()
#writer
#writerKey
/** @param {import('../core-manager/index.js').CoreManager} coreManager */
constructor(coreManager) {
super()
/** @type {undefined | Hyperdrive} */
let writer
const corestore = new PretendCorestore({ coreManager })
const blobIndexCores = coreManager.getCores('blobIndex')
const writerCoreRecord = coreManager.getWriterCore('blobIndex')
this.#writerKey = writerCoreRecord.key
for (const { key } of blobIndexCores) {
// @ts-ignore - we know pretendCorestore is not actually a Corestore
const drive = new Hyperdrive(corestore, key)
// We use the discovery key to derive the id for a drive
this.#hyperdrives.set(getDiscoveryId(key), drive)
if (key.equals(this.#writerKey)) {
writer = drive
}
}
if (!writer) {
throw new Error('Could not find a writer for the blobIndex namespace')
}
this.#writer = writer

coreManager.on('add-core', ({ key, namespace }) => {
if (namespace !== 'blobIndex') return
// We use the discovery key to derive the id for a drive
const driveId = getDiscoveryId(key)
if (this.#hyperdrives.has(driveId)) return
// @ts-ignore - we know pretendCorestore is not actually a Corestore
const drive = new Hyperdrive(corestore, key)
this.#hyperdrives.set(driveId, drive)
this.emit('add-drive', drive)
})
}
get writer() {
return this.#writer
}
get writerKey() {
return this.#writerKey
}
[Symbol.iterator]() {
return this.#hyperdrives.values()
}
/** @param {string} driveId */
get(driveId) {
return this.#hyperdrives.get(driveId)
}
}

/**
* Implements the `get()` method as used by hyperdrive-next. It returns the
* relevant cores from the Mapeo CoreManager.
*/
class PretendCorestore {
#coreManager
/**
* @param {object} options
* @param {import('../core-manager/index.js').CoreManager} options.coreManager
*/
constructor({ coreManager }) {
this.#coreManager = coreManager
}

/**
* @param {Buffer | { publicKey: Buffer } | { name: string }} opts
* @returns {import('hypercore')<"binary", Buffer> | undefined}
*/
get(opts) {
if (b4a.isBuffer(opts)) {
opts = { publicKey: opts }
}
if ('key' in opts) {
// @ts-ignore
opts.publicKey = opts.key
}
if ('publicKey' in opts) {
// NB! We should always add blobIndex (Hyperbee) cores to the core manager
// before we use them here. We would only reach the addCore path if the
// blob core is read from the hyperbee header (before it is added to the
// core manager)
return (
this.#coreManager.getCoreByKey(opts.publicKey) ||
this.#coreManager.addCore(opts.publicKey, 'blob').core
)
} else if (opts.name === 'db') {
return this.#coreManager.getWriterCore('blobIndex').core
} else if (opts.name.includes('blobs')) {
return this.#coreManager.getWriterCore('blob').core
} else {
throw new Error(
'Unsupported corestore.get() with opts ' + util.inspect(opts)
)
}
}

/** no-op */
close() {}
}

/**
* @param {Buffer} key Public key of hypercore
* @returns {string} Hex-encoded string of derived discovery key
*/
function getDiscoveryId(key) {
return discoveryKey(key).toString('hex')
}
117 changes: 3 additions & 114 deletions src/blob-store/index.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import Hyperdrive from 'hyperdrive'
import b4a from 'b4a'
import util from 'node:util'
import { pipeline } from 'node:stream'
import { discoveryKey } from 'hypercore-crypto'
import { Downloader } from './downloader.js'
import { createEntriesStream } from './entries-stream.js'
import { FilterEntriesStream } from './utils.js'
import { noop } from '../utils.js'
import { TypedEmitter } from 'tiny-typed-emitter'
import { HyperdriveIndexImpl as HyperdriveIndex } from './hyperdrive-index.js'

/** @import Hyperdrive from 'hyperdrive' */
/** @import { JsonObject } from 'type-fest' */
/** @import { Readable as NodeReadable } from 'node:stream' */
/** @import { Readable as StreamxReadable, Writable } from 'streamx' */
Expand Down Expand Up @@ -132,7 +131,7 @@ export class BlobStore extends TypedEmitter {
* @param {object} opts
* @param {boolean} [opts.live=false] Set to `true` to get a live stream of entries
* @param {import('./utils.js').GenericBlobFilter | null} [opts.filter] Filter blob types and/or variants in returned entries. Filter is { [BlobType]: BlobVariants[] }.
* @returns
* @returns {BlobStoreEntriesStream}
*/
createEntriesReadStream({ live = false, filter } = {}) {
const entriesStream = createEntriesStream(this.#driveIndex, { live })
Expand Down Expand Up @@ -249,66 +248,6 @@ export class BlobStore extends TypedEmitter {
}
}

// Don't want to export the class, but do want to export the type.
/** @typedef {HyperdriveIndex} THyperdriveIndex */

/**
* @extends {TypedEmitter<{ 'add-drive': (drive: Hyperdrive) => void }>}
*/
class HyperdriveIndex extends TypedEmitter {
/** @type {Map<string, Hyperdrive>} */
#hyperdrives = new Map()
#writer
#writerKey
/** @param {import('../core-manager/index.js').CoreManager} coreManager */
constructor(coreManager) {
super()
/** @type {undefined | Hyperdrive} */
let writer
const corestore = new PretendCorestore({ coreManager })
const blobIndexCores = coreManager.getCores('blobIndex')
const writerCoreRecord = coreManager.getWriterCore('blobIndex')
this.#writerKey = writerCoreRecord.key
for (const { key } of blobIndexCores) {
// @ts-ignore - we know pretendCorestore is not actually a Corestore
const drive = new Hyperdrive(corestore, key)
// We use the discovery key to derive the id for a drive
this.#hyperdrives.set(getDiscoveryId(key), drive)
if (key.equals(this.#writerKey)) {
writer = drive
}
}
if (!writer) {
throw new Error('Could not find a writer for the blobIndex namespace')
}
this.#writer = writer

coreManager.on('add-core', ({ key, namespace }) => {
if (namespace !== 'blobIndex') return
// We use the discovery key to derive the id for a drive
const driveId = getDiscoveryId(key)
if (this.#hyperdrives.has(driveId)) return
// @ts-ignore - we know pretendCorestore is not actually a Corestore
const drive = new Hyperdrive(corestore, key)
this.#hyperdrives.set(driveId, drive)
this.emit('add-drive', drive)
})
}
get writer() {
return this.#writer
}
get writerKey() {
return this.#writerKey
}
[Symbol.iterator]() {
return this.#hyperdrives.values()
}
/** @param {string} driveId */
get(driveId) {
return this.#hyperdrives.get(driveId)
}
}

/**
* @template {object} T
* @template {object} U
Expand All @@ -334,56 +273,6 @@ function makePath({ type, variant, name }) {
return `/${type}/${variant}/${name}`
}

/**
* Implements the `get()` method as used by hyperdrive-next. It returns the
* relevant cores from the Mapeo CoreManager.
*/
class PretendCorestore {
#coreManager
/**
* @param {object} options
* @param {import('../core-manager/index.js').CoreManager} options.coreManager
*/
constructor({ coreManager }) {
this.#coreManager = coreManager
}

/**
* @param {Buffer | { publicKey: Buffer } | { name: string }} opts
* @returns {import('hypercore')<"binary", Buffer> | undefined}
*/
get(opts) {
if (b4a.isBuffer(opts)) {
opts = { publicKey: opts }
}
if ('key' in opts) {
// @ts-ignore
opts.publicKey = opts.key
}
if ('publicKey' in opts) {
// NB! We should always add blobIndex (Hyperbee) cores to the core manager
// before we use them here. We would only reach the addCore path if the
// blob core is read from the hyperbee header (before it is added to the
// core manager)
return (
this.#coreManager.getCoreByKey(opts.publicKey) ||
this.#coreManager.addCore(opts.publicKey, 'blob').core
)
} else if (opts.name === 'db') {
return this.#coreManager.getWriterCore('blobIndex').core
} else if (opts.name.includes('blobs')) {
return this.#coreManager.getWriterCore('blob').core
} else {
throw new Error(
'Unsupported corestore.get() with opts ' + util.inspect(opts)
)
}
}

/** no-op */
close() {}
}

/**
* @param {Buffer} key Public key of hypercore
* @returns {string} Hex-encoded string of derived discovery key
Expand Down
3 changes: 2 additions & 1 deletion src/discovery/local-discovery.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import StartStopStateMachine from 'start-stop-state-machine'
import pTimeout from 'p-timeout'
import { keyToPublicId } from '@mapeo/crypto'
import { Logger } from '../logger.js'
import { getErrorCode } from '../lib/error.js'
/** @import { OpenedNoiseStream } from '../lib/noise-secret-stream-helpers.js' */

/** @typedef {{ publicKey: Buffer, secretKey: Buffer }} Keypair */
Expand Down Expand Up @@ -117,7 +118,7 @@ export class LocalDiscovery extends TypedEmitter {

/** @param {Error} e */
function onSocketError(e) {
if ('code' in e && e.code === 'EPIPE') {
if (getErrorCode(e) === 'EPIPE') {
socket.destroy()
if (secretStream) {
secretStream.destroy()
Expand Down
3 changes: 2 additions & 1 deletion src/fastify-plugins/maps.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ReaderWatch, Server as SMPServerPlugin } from 'styled-map-package'

import { noop } from '../utils.js'
import { NotFoundError, ENOENTError } from './utils.js'
import { getErrorCode } from '../lib/error.js'

/** @import { FastifyPluginAsync } from 'fastify' */
/** @import { Stats } from 'node:fs' */
Expand Down Expand Up @@ -56,7 +57,7 @@ export async function plugin(fastify, opts) {
try {
stats = await fs.stat(customMapPath)
} catch (err) {
if (err instanceof Error && 'code' in err && err.code === 'ENOENT') {
if (getErrorCode(err) === 'ENOENT') {
throw new ENOENTError(customMapPath)
}

Expand Down
24 changes: 24 additions & 0 deletions src/lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,30 @@ export class ErrorWithCode extends Error {
}
}

/**
* If the argument is an `Error` instance, return its `code` property if it is a string.
* Otherwise, returns `undefined`.
*
* @param {unknown} maybeError
* @returns {undefined | string}
* @example
* try {
* // do something
* } catch (err) {
* console.error(getErrorCode(err))
* }
*/
export function getErrorCode(maybeError) {
if (
maybeError instanceof Error &&
'code' in maybeError &&
typeof maybeError.code === 'string'
) {
return maybeError.code
}
return undefined
}

/**
* Get the error message from an object if possible.
* Otherwise, stringify the argument.
Expand Down
Loading

0 comments on commit 5ae541d

Please sign in to comment.