Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement media server in MapeoManager #365

Merged
merged 27 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
39bc4cb
update blob api
achou11 Oct 31, 2023
077492c
major refactoring
achou11 Oct 31, 2023
7f5284c
update blob plugin tests
achou11 Oct 31, 2023
421b010
set up media server in MapeoManager
achou11 Nov 1, 2023
df3544b
rename getBaseUrl opt to getMediaBaseUrl in BlobApi
achou11 Nov 1, 2023
b2f0aec
small format fix
achou11 Nov 1, 2023
81d909a
create MediaServer class
achou11 Nov 6, 2023
b44c2fa
remove unused symbol
achou11 Nov 6, 2023
2800c72
simplify manager start + stop test
achou11 Nov 6, 2023
8bef7ef
shorten names of prefix constants
achou11 Nov 6, 2023
84c9f74
update test message
achou11 Nov 6, 2023
cc61a9b
Add failing test
gmaclennan Nov 7, 2023
58ed4f0
speed up timeout test with fake timers
gmaclennan Nov 7, 2023
c2503cc
bind this.getProject to this
gmaclennan Nov 7, 2023
95b5d62
use projectPublicId in blob-api
gmaclennan Nov 7, 2023
943dc63
temp fix to blobApi
gmaclennan Nov 7, 2023
b154eea
small plugin updates
achou11 Nov 7, 2023
4c38fff
Merge branch 'main' into media-server
achou11 Nov 7, 2023
e50272d
fix issues with start and stop
achou11 Nov 8, 2023
64861a3
remove accidental solo
achou11 Nov 8, 2023
9ae53a6
Merge branch 'main' into media-server
achou11 Nov 9, 2023
5b0d7e8
move fastify to direct deps
achou11 Nov 9, 2023
a90efe4
use call instead of bind for server.listen()
achou11 Nov 9, 2023
3d4bbd4
remove onRequest hook
achou11 Nov 9, 2023
3dfbf27
update BlobApi projectId opt to projectPublicId
achou11 Nov 9, 2023
2e13b42
comment out failing BlobApi test assertion
achou11 Nov 9, 2023
3282473
reinsert accidentally removed ts-expect-error
achou11 Nov 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 118 additions & 65 deletions package-lock.json

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
"depcheck": "^1.4.3",
"drizzle-kit": "^0.19.12",
"eslint": "^8.39.0",
"fastify": "^4.20.0",
"fastify": "^4.24.3",
"husky": "^8.0.0",
"light-my-request": "^5.10.0",
"lint-staged": "^14.0.1",
Expand All @@ -101,7 +101,8 @@
"ts-proto": "^1.156.7",
"typedoc": "^0.24.8",
"typedoc-plugin-markdown": "^3.15.3",
"typescript": "^5.1.6"
"typescript": "^5.1.6",
"undici": "^5.27.2"
},
"peerDependencies": {
"fastify": ">= 4"
Expand Down
37 changes: 22 additions & 15 deletions src/blob-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@ import { createHash } from 'node:crypto'
import sodium from 'sodium-universal'
import b4a from 'b4a'

import { getPort } from './blob-server/index.js'

/** @typedef {import('./types.js').BlobId} BlobId */
/** @typedef {import('./types.js').BlobType} BlobType */
/** @typedef {import('./types.js').BlobVariant<BlobType>} BlobVariant */

export class BlobApi {
#blobStore
#getMediaBaseUrl
#projectId

/**
* @param {object} options
* @param {string} options.projectId
* @param {import('./blob-store/index.js').BlobStore} options.blobStore
* @param {import('fastify').FastifyInstance} options.blobServer
* @param {() => Promise<string>} options.getMediaBaseUrl
*/
constructor({ projectId, blobStore, blobServer }) {
this.projectId = projectId
this.blobStore = blobStore
this.blobServer = blobServer
constructor({ projectId, blobStore, getMediaBaseUrl }) {
this.#blobStore = blobStore
this.#getMediaBaseUrl = getMediaBaseUrl
this.#projectId = projectId
}

/**
Expand All @@ -30,8 +31,14 @@ export class BlobApi {
*/
async getUrl(blobId) {
const { driveId, type, variant, name } = blobId
const port = await getPort(this.blobServer.server)
return `http://127.0.0.1:${port}/${this.projectId}/${driveId}/${type}/${variant}/${name}`

let base = await this.#getMediaBaseUrl()

if (!base.endsWith('/')) {
base += '/'
}

return base + `${this.#projectId}/${driveId}/${type}/${variant}/${name}`
}

/**
Expand All @@ -57,8 +64,8 @@ export class BlobApi {
variant: 'original',
type: blobType,
},
metadata,
contentHash
metadata
// contentHash
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes the e2e media-server test, but it breaks the blob-api test because the hash is incorrect.

)

if (preview) {
Expand Down Expand Up @@ -86,7 +93,7 @@ export class BlobApi {
}

return {
driveId: this.blobStore.writerDriveId,
driveId: this.#blobStore.writerDriveId,
name,
type: blobType,
hash: contentHash.digest('hex'),
Expand All @@ -108,7 +115,7 @@ export class BlobApi {
hash,

// @ts-ignore TODO: remove driveId property from createWriteStream
this.blobStore.createWriteStream({ type, variant, name }, { metadata })
this.#blobStore.createWriteStream({ type, variant, name }, { metadata })
)

return { name, variant, type, hash }
Expand All @@ -117,7 +124,7 @@ export class BlobApi {
// @ts-ignore TODO: return value types don't match pipeline's expectations, though they should
await pipeline(
fs.createReadStream(filepath),
this.blobStore.createWriteStream({ type, variant, name }, { metadata })
this.#blobStore.createWriteStream({ type, variant, name }, { metadata })
)

return { name, variant, type }
Expand Down
40 changes: 0 additions & 40 deletions src/blob-server/index.js

This file was deleted.

18 changes: 8 additions & 10 deletions src/blob-server/fastify-plugin.js → src/fastify-plugins/blobs.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
// @ts-check
import fp from 'fastify-plugin'
import { filetypemime } from 'magic-bytes.js'
import { Type as T } from '@sinclair/typebox'

import { SUPPORTED_BLOB_VARIANTS } from '../blob-store/index.js'
import { HEX_REGEX_32_BYTES, Z_BASE_32_REGEX_32_BYTES } from './constants.js'

export default fp(blobServerPlugin, {
fastify: '4.x',
name: 'mapeo-blob-server',
name: 'mapeo-blobs',
})

/** @typedef {import('../types.js').BlobId} BlobId */

/**
* @typedef {Object} BlobServerPluginOpts
*
* @property {(projectId: string) => import('../blob-store/index.js').BlobStore} getBlobStore
* @property {(projectPublicId: string) => Promise<import('../blob-store/index.js').BlobStore>} getBlobStore
*/

const BLOB_TYPES = /** @type {BlobId['type'][]} */ (
Expand All @@ -24,12 +24,10 @@ const BLOB_TYPES = /** @type {BlobId['type'][]} */ (
const BLOB_VARIANTS = [
...new Set(Object.values(SUPPORTED_BLOB_VARIANTS).flat()),
]
const HEX_REGEX_32_BYTES = '^[0-9a-fA-F]{64}$'
const HEX_STRING_32_BYTES = T.String({ pattern: HEX_REGEX_32_BYTES })

const PARAMS_JSON_SCHEMA = T.Object({
projectId: HEX_STRING_32_BYTES,
driveId: HEX_STRING_32_BYTES,
projectPublicId: T.String({ pattern: Z_BASE_32_REGEX_32_BYTES }),
driveId: T.String({ pattern: HEX_REGEX_32_BYTES }),
type: T.Union(
BLOB_TYPES.map((type) => {
return T.Literal(type)
Expand Down Expand Up @@ -57,10 +55,10 @@ async function routes(fastify, options) {
const { getBlobStore } = options

fastify.get(
'/:projectId/:driveId/:type/:variant/:name',
'/:projectPublicId/:driveId/:type/:variant/:name',
{ schema: { params: PARAMS_JSON_SCHEMA } },
async (request, reply) => {
const { projectId, ...blobId } = request.params
const { projectPublicId, ...blobId } = request.params

if (!isValidBlobId(blobId)) {
reply.code(400)
Expand All @@ -72,7 +70,7 @@ async function routes(fastify, options) {

let blobStore
try {
blobStore = getBlobStore(projectId)
blobStore = await getBlobStore(projectPublicId)
} catch (e) {
reply.code(404)
throw e
Expand Down
4 changes: 4 additions & 0 deletions src/fastify-plugins/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export const HEX_REGEX_32_BYTES = '^[0-9a-fA-F]{64}$'

// z-base-32 encoded 32 byte string (52 characters)
export const Z_BASE_32_REGEX_32_BYTES = '^[0-9a-zA-Z]{52}$'
25 changes: 24 additions & 1 deletion src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js'
import { LocalPeers } from './local-peers.js'
import { InviteApi } from './invite-api.js'
import { MediaServer } from './media-server.js'

/** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */

Expand All @@ -50,14 +51,16 @@ export class MapeoManager {
#deviceId
#rpc
#invite
#mediaServer

/**
* @param {Object} opts
* @param {Buffer} opts.rootKey 16-bytes of random data that uniquely identify the device, used to derive a 32-byte master key, which is used to derive all the keypairs used for Mapeo
* @param {string} opts.dbFolder Folder for sqlite Dbs. Folder must exist. Use ':memory:' to store everything in-memory
* @param {string | import('./types.js').CoreStorage} opts.coreStorage Folder for hypercore storage or a function that returns a RandomAccessStorage instance
* @param {{ port?: number, logger: import('fastify').FastifyServerOptions['logger'] }} [opts.mediaServerOpts]
*/
constructor({ rootKey, dbFolder, coreStorage }) {
constructor({ rootKey, dbFolder, coreStorage, mediaServerOpts }) {
this.#dbFolder = dbFolder
const sqlite = new Database(
dbFolder === ':memory:'
Expand Down Expand Up @@ -103,6 +106,11 @@ export class MapeoManager {
} else {
this.#coreStorage = coreStorage
}

this.#mediaServer = new MediaServer({
logger: mediaServerOpts?.logger,
getProject: this.getProject.bind(this),
})
}

/**
Expand Down Expand Up @@ -214,6 +222,9 @@ export class MapeoManager {
sharedDb: this.#db,
sharedIndexWriter: this.#projectSettingsIndexWriter,
rpc: this.#rpc,
getMediaBaseUrl: this.#mediaServer.getMediaAddress.bind(
this.#mediaServer
),
})

// 5. Write project name and any other relevant metadata to project instance
Expand Down Expand Up @@ -270,6 +281,7 @@ export class MapeoManager {
sharedDb: this.#db,
sharedIndexWriter: this.#projectSettingsIndexWriter,
rpc: this.#rpc,
getMediaBaseUrl: this.#mediaServer.getMediaAddress,
})

// 3. Keep track of project instance as we know it's a properly existing project
Expand Down Expand Up @@ -425,4 +437,15 @@ export class MapeoManager {
get invite() {
return this.#invite
}

/**
* @param {import('./media-server.js').StartOpts} [opts]
*/
async start(opts) {
await this.#mediaServer.start(opts)
}

async stop() {
await this.#mediaServer.stop()
}
}
30 changes: 17 additions & 13 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { CoreManager, NAMESPACES } from './core-manager/index.js'
import { DataStore } from './datastore/index.js'
import { DataType, kCreateWithDocId } from './datatype/index.js'
import { BlobStore } from './blob-store/index.js'
import { createBlobServer } from './blob-server/index.js'
import { BlobApi } from './blob-api.js'
import { IndexWriter } from './index-writer/index.js'
import { projectSettingsTable } from './schema/client.js'
Expand All @@ -30,7 +29,12 @@ import {
mapAndValidateCoreOwnership,
} from './core-ownership.js'
import { Capabilities } from './capabilities.js'
import { getDeviceId, projectKeyToId, valueOf } from './utils.js'
import {
getDeviceId,
projectKeyToId,
projectKeyToPublicId,
valueOf,
} from './utils.js'
import { MemberApi } from './member-api.js'
import { SyncController } from './sync/sync-controller.js'

Expand All @@ -42,6 +46,7 @@ export const kCoreOwnership = Symbol('coreOwnership')
export const kCapabilities = Symbol('capabilities')
export const kSetOwnDeviceInfo = Symbol('kSetOwnDeviceInfo')
export const kReplicate = Symbol('replicate')
export const kBlobStore = Symbol('blobStore')

export class MapeoProject {
#projectId
Expand All @@ -50,12 +55,12 @@ export class MapeoProject {
#dataStores
#dataTypes
#blobStore
#blobServer
#coreOwnership
#capabilities
#ownershipWriteDone
#memberApi
#syncController
#projectPublicId

/**
* @param {Object} opts
Expand All @@ -68,6 +73,7 @@ export class MapeoProject {
* @param {IndexWriter} opts.sharedIndexWriter
* @param {import('./types.js').CoreStorage} opts.coreStorage Folder to store all hypercore data
* @param {import('./local-peers.js').LocalPeers} opts.rpc
* @param {(mediaType: 'blobs' | 'icons') => Promise<string>} opts.getMediaBaseUrl
*
*/
constructor({
Expand All @@ -80,9 +86,11 @@ export class MapeoProject {
projectSecretKey,
encryptionKeys,
rpc,
getMediaBaseUrl,
}) {
this.#deviceId = getDeviceId(keyManager)
this.#projectId = projectKeyToId(projectKey)
this.#projectPublicId = projectKeyToPublicId(projectKey)

///////// 1. Setup database
const sqlite = new Database(dbPath)
Expand Down Expand Up @@ -205,18 +213,10 @@ export class MapeoProject {
coreManager: this.#coreManager,
})

this.#blobServer = createBlobServer({
logger: true,
blobStore: this.#blobStore,
prefix: '/blobs/',
projectId: this.#projectId,
})

// @ts-ignore TODO: pass in blobServer
this.$blobs = new BlobApi({
projectId: this.#projectId,
projectId: this.#projectPublicId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename this in the PR to avoid confusion later

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed via 3dfbf27

blobStore: this.#blobStore,
blobServer: this.#blobServer,
getMediaBaseUrl: async () => getMediaBaseUrl('blobs'),
})

this.#coreOwnership = new CoreOwnership({
Expand Down Expand Up @@ -286,6 +286,10 @@ export class MapeoProject {
return this.#capabilities
}

get [kBlobStore]() {
return this.#blobStore
}

get deviceId() {
return this.#deviceId
}
Expand Down
Loading
Loading