Skip to content

Commit

Permalink
fix tests, fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaclennan committed Oct 29, 2024
1 parent cb4d1c8 commit 35f297d
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 528 deletions.
22 changes: 11 additions & 11 deletions src/blob-store/downloader.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ export class Downloader extends TypedEmitter {
this.#driveIndex = driveIndex

this.#entriesStream = createEntriesStream(driveIndex, { live: true })
this.#entriesStream.once('error', this.#ac.abort)
this.#entriesStream.once('error', this.#handleError)

this.#ac.signal.addEventListener('abort', this.#handleAbort, { once: true })

this.#processEntriesPromise = this.#processEntries()
this.#processEntriesPromise.catch(this.#ac.abort)
this.#processEntriesPromise.catch(this.#handleError)
}

/**
Expand Down Expand Up @@ -99,10 +99,8 @@ export class Downloader extends TypedEmitter {
this.#queuedDownloads.add(download)
download
.done()
.catch((e) => {
// According to the code, this should never throw.
this.#ac.abort(e)
})
// According to the code, this should never throw.
.catch(this.#handleError)
.finally(() => {
this.#queuedDownloads.delete(download)
})
Expand All @@ -115,12 +113,14 @@ export class Downloader extends TypedEmitter {
this.#ac.abort()
}

/** @param {any} error */
#handleError = (error) => {
if (this.#ac.signal.aborted) return
this.emit('error', error)
this.#ac.abort(error)
}

#handleAbort = () => {
const abortReason = this.#ac.signal.reason
const wasAbortedByDestroy = abortReason && abortReason.name === 'AbortError'
if (!wasAbortedByDestroy) {
this.emit('error', abortReason)
}
for (const download of this.#queuedDownloads) download.destroy()
this.#ac.signal.removeEventListener('abort', this.#handleAbort)
this.#entriesStream.removeListener('error', this.#ac.abort)
Expand Down
5 changes: 3 additions & 2 deletions src/blob-store/entries-stream.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import SubEncoder from 'sub-encoder'
import mergeStreams from '@sindresorhus/merge-streams'
import { Transform } from 'node:stream'
import { Transform, pipeline } from 'node:stream'
import { noop } from '../utils.js'

/** @import Hyperdrive from 'hyperdrive' */
/** @import { BlobStoreEntriesStream } from '../types.js' */
Expand Down Expand Up @@ -48,7 +49,7 @@ function getHistoryStream(bee, { live }) {
// under the `files` sub-encoding key
keyEncoding,
})
return historyStream.pipe(new AddDriveIds(bee.core))
return pipeline(historyStream, new AddDriveIds(bee.core), noop)
}

class AddDriveIds extends Transform {
Expand Down
2 changes: 1 addition & 1 deletion src/blob-store/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ export class BlobStore extends TypedEmitter {
*
* @param {object} opts
* @param {boolean} [opts.live=false] Set to `true` to get a live stream of entries
* @param {import('../types.js').BlobFilter | null} [opts.filter] Filter blob types and/or variants in returned entries. Filter is { [BlobType]: BlobVariants[] }.
* @param {import('./utils.js').GenericBlobFilter | null} [opts.filter] Filter blob types and/or variants in returned entries. Filter is { [BlobType]: BlobVariants[] }.
* @returns
*/
createEntriesReadStream({ live = false, filter } = {}) {
Expand Down
4 changes: 4 additions & 0 deletions src/blob-store/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import { Transform } from 'node:stream'
export function pathPrefixesFromFilter(filter) {
const pathPrefixes = []
for (const [type, variants] of Object.entries(filter)) {
if (variants.length === 0) {
pathPrefixes.push(`/${type}/`)
continue
}
const dedupedVariants = new Set(variants)
for (const variant of dedupedVariants) {
pathPrefixes.push(`/${type}/${variant}/`)
Expand Down
214 changes: 58 additions & 156 deletions test/blob-store/blob-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import {
BlobStore,
SUPPORTED_BLOB_VARIANTS,
} from '../../src/blob-store/index.js'
import { setTimeout } from 'node:timers/promises'
import { concat } from '../helpers/blob-store.js'
import { discoveryKey } from 'hypercore-crypto'
import { setTimeout as delay } from 'node:timers/promises'
Expand Down Expand Up @@ -290,9 +289,9 @@ test('blobStore.writerDriveId', async () => {
})

// Tests:
// A) Downloads from peers connected when download() is first called
// B) Downloads from peers connected after download() is first called
test.skip('live download', async function () {
// A) Downloads from peers blobs added before replication
// B) Downloads from peers blobs added after replication
test('download all blobs', async function () {
const projectKey = randomBytes(32)
const { blobStore: bs1, coreManager: cm1 } = testenv({ projectKey })
const { blobStore: bs2, coreManager: cm2 } = testenv({ projectKey })
Expand All @@ -315,17 +314,13 @@ test.skip('live download', async function () {
const driveId1 = await bs1.put(blob1Id, blob1)
// STEP 2: Replicate CM1 with CM3
const { destroy: destroy1 } = replicate(cm1, cm3)
// STEP 3: Start live download to CM3
const liveDownload = bs3.download()
// STEP 4: Wait for blobs to be downloaded
await downloaded(liveDownload)
// STEP 5: Replicate CM2 with CM3
// STEP 3: Replicate CM2 with CM3
const { destroy: destroy2 } = replicate(cm2, cm3)
// STEP 6: Write a blob to CM2
// STEP 4: Write a blob to CM2
const driveId2 = await bs2.put(blob2Id, blob2)
// STEP 7: Wait for blobs to be downloaded
await downloaded(liveDownload)
// STEP 8: destroy all the replication streams
// STEP 5: Wait for blobs to be downloaded
await delay(200)
// STEP 6: destroy all the replication streams
await Promise.all([destroy1(), destroy2()])

// Both blob1 and blob2 (from CM1 and CM2) should have been downloaded to CM3
Expand All @@ -341,10 +336,13 @@ test.skip('live download', async function () {
)
})

test.skip('sparse live download', async function () {
test('filtered download, filter changed', async function () {
const projectKey = randomBytes(32)
const { blobStore: bs1, coreManager: cm1 } = testenv({ projectKey })
const { blobStore: bs2, coreManager: cm2 } = testenv({ projectKey })
const { blobStore: bs2, coreManager: cm2 } = testenv({
projectKey,
downloadFilter: { photo: ['thumbnail', 'preview'] },
})

const blob1 = randomBytes(TEST_BUF_SIZE)
const blob1Id = /** @type {const} */ ({
Expand All @@ -371,77 +369,37 @@ test.skip('sparse live download', async function () {

const { destroy } = replicate(cm1, cm2)

const liveDownload = bs2.download({
filter: { photo: ['original', 'preview'] },
})
await downloaded(liveDownload)

await destroy()
// Wait for blobs to be downloaded
await delay(200)

assert.deepEqual(
await bs2.get({ ...blob1Id, driveId }),
blob1,
'blob1 was downloaded'
)
assert.deepEqual(
await bs2.get({ ...blob2Id, driveId }),
blob2,
'blob2 was downloaded'
'preview was downloaded'
)
assert.deepEqual(
await bs2.get({ ...blob3Id, driveId }),
blob3,
'thumbnail was downloaded'
)
await assert.rejects(
() => bs2.get({ ...blob3Id, driveId }),
'blob3 was not downloaded'
() => bs2.get({ ...blob1Id, driveId }),
'original was not downloaded'
)
})

test.skip('cancelled live download', async function () {
const projectKey = randomBytes(32)
const { blobStore: bs1, coreManager: cm1 } = testenv({ projectKey })
const { blobStore: bs2, coreManager: cm2 } = testenv({ projectKey })
const { blobStore: bs3, coreManager: cm3 } = testenv({ projectKey })
// Change the filter to download all
bs2.setDownloadFilter(null)

const blob1 = randomBytes(TEST_BUF_SIZE)
const blob1Id = /** @type {const} */ ({
type: 'photo',
variant: 'original',
name: 'blob1',
})
const blob2 = randomBytes(TEST_BUF_SIZE)
const blob2Id = /** @type {const} */ ({
type: 'photo',
variant: 'original',
name: 'blob2',
})

// STEP 1: Write a blob to CM1
const driveId1 = await bs1.put(blob1Id, blob1)
// STEP 2: Replicate CM1 with CM3
const { destroy: destroy1 } = replicate(cm1, cm3)
// STEP 3: Start live download to CM3
const liveDownload = bs3.download()
// STEP 4: Wait for blobs to be downloaded
await downloaded(liveDownload)
// STEP 5: Cancel download
liveDownload.destroy()
// STEP 6: Replicate CM2 with CM3
const { destroy: destroy2 } = replicate(cm2, cm3)
// STEP 7: Write a blob to CM2
const driveId2 = await bs2.put(blob2Id, blob2)
// STEP 8: Wait for blobs to (not) download
await setTimeout(200)
// STEP 9: destroy all the replication streams
await Promise.all([destroy1(), destroy2()])
// Wait for blobs to be downloaded
await delay(200)

// Both blob1 and blob2 (from CM1 and CM2) should have been downloaded to CM3
assert.deepEqual(
await bs3.get({ ...blob1Id, driveId: driveId1 }),
await bs2.get({ ...blob1Id, driveId }),
blob1,
'blob1 was downloaded'
)
await assert.rejects(
async () => bs3.get({ ...blob2Id, driveId: driveId2 }),
'blob2 was not downloaded'
'original was downloaded'
)

await destroy()
})

test('blobStore.getEntryBlob(driveId, entry)', async () => {
Expand Down Expand Up @@ -542,29 +500,29 @@ test('blobStore.createEntriesReadStream({ live: false })', async (t) => {
return keys
}

await t.test('no folders filter, returns everything', async () => {
await t.test('no filter, returns everything', async () => {
const expectedKeys = new Set(inputKeys)
const entriesStream = blobStore.createEntriesReadStream()
const keys = await getKeys(entriesStream)
assert.deepEqual(keys, expectedKeys, 'returns all keys')
})

await t.test('[] folders filter, returns everything', async () => {
await t.test('null filter, returns everything', async () => {
const expectedKeys = new Set(inputKeys)
const entriesStream = blobStore.createEntriesReadStream({ folders: [] })
const entriesStream = blobStore.createEntriesReadStream({ filter: null })
const keys = await getKeys(entriesStream)
assert.deepEqual(keys, expectedKeys, 'returns all keys')
})

await t.test('single folders filter', async () => {
const folders = ['/photo']
await t.test('blob type only, returns all variants', async () => {
const filter = { photo: [] }
const unexpectedKeys = new Set(
inputKeys.filter((key) => key.startsWith(folders[0]))
inputKeys.filter((key) => key.startsWith('/photo'))
)
const expectedKeys = new Set(
inputKeys.filter((key) => key.startsWith(addTrailingSlash(folders[0])))
inputKeys.filter((key) => key.startsWith('/photo/'))
)
const entriesStream = blobStore.createEntriesReadStream({ folders })
const entriesStream = blobStore.createEntriesReadStream({ filter })
const keys = await getKeys(entriesStream)
assert.notDeepEqual(
keys,
Expand All @@ -574,61 +532,29 @@ test('blobStore.createEntriesReadStream({ live: false })', async (t) => {
assert.deepEqual(keys, expectedKeys, 'returns expected keys')
})

await t.test('multiple folders filter, no subfolder', async () => {
const folders = ['/video/original', '/photo/preview']
await t.test('multiple types and variants filter', async () => {
const filter = {
video: ['original'],
photo: ['preview'],
}
const expectedKeys = new Set(
inputKeys.filter((key) =>
folders.find((folder) => key.startsWith(addTrailingSlash(folder)))
inputKeys.filter(
(key) =>
key.startsWith('/video/original/') ||
key.startsWith('/photo/preview/')
)
)
const entriesStream = blobStore.createEntriesReadStream({ folders })
const keys = await getKeys(entriesStream)
assert.deepEqual(keys, expectedKeys, 'returns expected keys')
})

await t.test('multiple folders filter, subfolder', async () => {
const folders = ['/photo/original', '/photo']
const expectedKeys = new Set(
inputKeys.filter((key) => key.startsWith(addTrailingSlash(folders[1])))
)
const entriesStream = blobStore.createEntriesReadStream({ folders })
const keys = await getKeys(entriesStream)
assert.deepEqual(keys, expectedKeys, 'returns expected keys')
})

await t.test('folders filter with trailing slashes', async () => {
const folders = ['/photo/original/']
const expectedKeys = new Set(
inputKeys.filter((key) => key.startsWith(addTrailingSlash(folders[0])))
)
const entriesStream = blobStore.createEntriesReadStream({ folders })
const keys = await getKeys(entriesStream)
assert.deepEqual(keys, expectedKeys, 'returns expected keys')
})

await t.test('folders filter without leading slash', async () => {
const folders = ['photo/original']
const expectedKeys = new Set(
inputKeys.filter((key) => key.startsWith('/photo/original/'))
)
const entriesStream = blobStore.createEntriesReadStream({ folders })
const keys = await getKeys(entriesStream)
assert.deepEqual(keys, expectedKeys, 'returns expected keys')
})

await t.test('folders filter windows separator', async () => {
const folders = ['C:\\photo\\original']
const expectedKeys = new Set(
inputKeys.filter((key) => key.startsWith('/photo/original/'))
)
const entriesStream = blobStore.createEntriesReadStream({ folders })
const entriesStream = blobStore.createEntriesReadStream({ filter })
const keys = await getKeys(entriesStream)
assert.deepEqual(keys, expectedKeys, 'returns expected keys')
})

await t.test('folders filter unknown blob type & variant', async () => {
const folders = ['/unknownType', '/photo/unknownVariant']
const entriesStream = blobStore.createEntriesReadStream({ folders })
const filter = {
unknownType: [],
photo: ['unknownVariant'],
}
const entriesStream = blobStore.createEntriesReadStream({ filter })
const keys = await getKeys(entriesStream)
assert.deepEqual(keys.size, 2)
})
Expand Down Expand Up @@ -694,36 +620,12 @@ function randomBlobId() {
function blobIdToKey({ name, type, variant }) {
return `/${type}/${variant}/${name}`
}
/** @param {string} path */
function addTrailingSlash(path) {
return path.endsWith('/') ? path : `${path}/`
}

/**
* @param {Parameters<typeof createCoreManager>} args
* @param {Parameters<typeof createCoreManager>[0] & { downloadFilter?: ConstructorParameters<typeof BlobStore>[0]['downloadFilter'] }} opts
*/
function testenv(...args) {
const coreManager = createCoreManager(...args)
const blobStore = new BlobStore({ coreManager })
function testenv({ downloadFilter = null, ...coreManagerOpts } = {}) {
const coreManager = createCoreManager(coreManagerOpts)
const blobStore = new BlobStore({ coreManager, downloadFilter })
return { blobStore, coreManager }
}

/**
* Resolve when liveDownload status is 'downloaded'
*
* @param {import('../../src/blob-store/downloader.js').Downloader} liveDownload
* @returns {Promise<void>}
*/
async function downloaded(liveDownload) {
return new Promise((res) => {
liveDownload.on('state', function onState(state) {
// If liveDownload is created before all cores have been added to the
// replication stream, then initially it will emit `downloaded` (since it
// has downloaded the zero data there is available to download), so we
// also wait for the `downloaded` once data has actually downloaded
if (state.status !== 'downloaded' || state.haveCount === 0) return
liveDownload.off('state', onState)
res()
})
})
}
Loading

0 comments on commit 35f297d

Please sign in to comment.