Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate LocalDiscovery & LocalPeers #358

Merged
merged 1 commit into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions src/local-peers.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)])
* @property {string | undefined} name
*/
/** @typedef {PeerInfoBase & { status: 'connecting' }} PeerInfoConnecting */
/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux }} PeerInfoConnected */
/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux<import('@hyperswarm/secret-stream')> }} PeerInfoConnected */
/** @typedef {PeerInfoBase & { status: 'disconnected', disconnectedAt: number }} PeerInfoDisconnected */

/** @typedef {PeerInfoConnecting | PeerInfoConnected | PeerInfoDisconnected} PeerInfoInternal */
Expand All @@ -57,7 +57,7 @@ class Peer {
#name
#connectedAt = 0
#disconnectedAt = 0
/** @type {Protomux} */
/** @type {Protomux<import('@hyperswarm/secret-stream')>} */
#protomux

/**
Expand Down Expand Up @@ -103,7 +103,7 @@ class Peer {
}
}
}
/** @param {Protomux} protomux */
/** @param {Protomux<import('@hyperswarm/secret-stream')>} protomux */
connect(protomux) {
this.#protomux = protomux
/* c8 ignore next 3 */
Expand Down Expand Up @@ -166,7 +166,9 @@ class Peer {
/**
* @typedef {object} LocalPeersEvents
* @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status
* @property {(peer: PeerInfoConnected) => void} peer-add Emitted when a new peer is connected
* @property {(peerId: string, invite: InviteWithKeys) => void} invite Emitted when an invite is received
* @property {(discoveryKey: Buffer, stream: import('./types.js').ReplicationStream) => void} discovery-key Emitted when a new hypercore is replicated (by a peer) to a peer replication stream (passed as the second parameter)
*/

/** @extends {TypedEmitter<LocalPeersEvents>} */
Expand Down Expand Up @@ -272,6 +274,13 @@ export class LocalPeers extends TypedEmitter {
stream.userData = protomux
this.#opening.add(stream.opened)

protomux.pair(
{ protocol: 'hypercore/alpha' },
/** @param {Buffer} discoveryKey */ async (discoveryKey) => {
this.emit('discovery-key', discoveryKey, stream.rawStream)
}
)

// No need to connect error handler to stream because Protomux does this,
// and errors are eventually handled by #closePeer

Expand Down Expand Up @@ -319,16 +328,16 @@ export class LocalPeers extends TypedEmitter {

/**
* @param {Buffer} publicKey
* @param {Protomux} protomux
* @param {Protomux<import('@hyperswarm/secret-stream')>} protomux
*/
#openPeer(publicKey, protomux) {
const peerId = keyToId(publicKey)
const peer = this.#peers.get(peerId)
/* c8 ignore next */
if (!peer) return // TODO: report error - this should not happen
const wasConnected = peer.info.status === 'connected'
peer.connect(protomux)
if (!wasConnected) this.#emitPeers()
this.#emitPeers()
this.emit('peer-add', /** @type {PeerInfoConnected} */ (peer.info))
}

/** @param {Buffer} publicKey */
Expand Down
67 changes: 49 additions & 18 deletions src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ import { ProjectKeys } from './generated/keys.js'
import {
deNullify,
getDeviceId,
keyToId,
openedNoiseSecretStream,
projectIdToNonce,
projectKeyToId,
projectKeyToPublicId,
} from './utils.js'
import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js'
import { LocalPeers } from './local-peers.js'
import { InviteApi } from './invite-api.js'
import { LocalDiscovery } from './discovery/local-discovery.js'

/** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */

Expand All @@ -48,8 +51,9 @@ export class MapeoManager {
#coreStorage
#dbFolder
#deviceId
#rpc
#localPeers
#invite
#localDiscovery

/**
* @param {Object} opts
Expand All @@ -69,7 +73,7 @@ export class MapeoManager {
migrationsFolder: new URL('../drizzle/client', import.meta.url).pathname,
})

this.#rpc = new LocalPeers()
this.#localPeers = new LocalPeers()
this.#keyManager = new KeyManager(rootKey)
this.#deviceId = getDeviceId(this.#keyManager)
this.#projectSettingsIndexWriter = new IndexWriter({
Expand All @@ -79,7 +83,7 @@ export class MapeoManager {
this.#activeProjects = new Map()

this.#invite = new InviteApi({
rpc: this.#rpc,
rpc: this.#localPeers,
queries: {
isMember: async (projectId) => {
const projectExists = this.#db
Expand All @@ -99,17 +103,43 @@ export class MapeoManager {
if (typeof coreStorage === 'string') {
const pool = new RandomAccessFilePool(MAX_FILE_DESCRIPTORS)
// @ts-ignore
this.#coreStorage = Hypercore.createStorage(coreStorage, { pool })
this.#coreStorage = Hypercore.defaultStorage(coreStorage, { pool })
} else {
this.#coreStorage = coreStorage
}

this.#localDiscovery = new LocalDiscovery({
identityKeypair: this.#keyManager.getIdentityKeypair(),
})
this.#localDiscovery.on('connection', this.replicate.bind(this))
}

/**
* MapeoRPC instance, used for tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* MapeoRPC instance, used for tests
* LocalPeers instance, used for tests

*/
get [kRPC]() {
return this.#rpc
return this.#localPeers
}

/**
* Replicate Mapeo to a `@hyperswarm/secret-stream`. Should only be used for
* local (trusted) connections, because the RPC channel key is public
*
* @param {import('@hyperswarm/secret-stream')<any>} noiseStream
*/
replicate(noiseStream) {
const replicationStream = this.#localPeers.connect(noiseStream)
Promise.all([this.getDeviceInfo(), openedNoiseSecretStream(noiseStream)])
.then(([{ name }, openedNoiseStream]) => {
if (openedNoiseStream.destroyed || !name) return
const peerId = keyToId(openedNoiseStream.remotePublicKey)
return this.#localPeers.sendDeviceInfo(peerId, { name })
})
.catch((e) => {
// Ignore error but log
console.error('Failed to send device info to peer', e)
})
return replicationStream
}

/**
Expand Down Expand Up @@ -205,15 +235,10 @@ export class MapeoManager {
})

// 4. Create MapeoProject instance
const project = new MapeoProject({
...this.#projectStorage(projectId),
const project = this.#createProjectInstance({
encryptionKeys,
keyManager: this.#keyManager,
projectKey: projectKeypair.publicKey,
projectSecretKey: projectKeypair.secretKey,
sharedDb: this.#db,
sharedIndexWriter: this.#projectSettingsIndexWriter,
rpc: this.#rpc,
})

// 5. Write project name and any other relevant metadata to project instance
Expand Down Expand Up @@ -263,19 +288,25 @@ export class MapeoManager {
projectId
)

const project = new MapeoProject({
const project = this.#createProjectInstance(projectKeys)

// 3. Keep track of project instance as we know it's a properly existing project
this.#activeProjects.set(projectPublicId, project)

return project
}

/** @param {ProjectKeys} projectKeys */
#createProjectInstance(projectKeys) {
const projectId = keyToId(projectKeys.projectKey)
return new MapeoProject({
...this.#projectStorage(projectId),
...projectKeys,
keyManager: this.#keyManager,
sharedDb: this.#db,
sharedIndexWriter: this.#projectSettingsIndexWriter,
rpc: this.#rpc,
localPeers: this.#localPeers,
})

// 3. Keep track of project instance as we know it's a properly existing project
this.#activeProjects.set(projectPublicId, project)

return project
}

/**
Expand Down
34 changes: 29 additions & 5 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { Capabilities } from './capabilities.js'
import { getDeviceId, projectKeyToId, valueOf } from './utils.js'
import { MemberApi } from './member-api.js'
import { SyncController } from './sync/sync-controller.js'
import Hypercore from 'hypercore'

/** @typedef {Omit<import('@mapeo/schema').ProjectSettingsValue, 'schemaName'>} EditableProjectSettings */

Expand Down Expand Up @@ -67,7 +68,7 @@ export class MapeoProject {
* @param {import('drizzle-orm/better-sqlite3').BetterSQLite3Database} opts.sharedDb
* @param {IndexWriter} opts.sharedIndexWriter
* @param {import('./types.js').CoreStorage} opts.coreStorage Folder to store all hypercore data
* @param {import('./local-peers.js').LocalPeers} opts.rpc
* @param {import('./local-peers.js').LocalPeers} opts.localPeers
*
*/
constructor({
Expand All @@ -79,7 +80,7 @@ export class MapeoProject {
projectKey,
projectSecretKey,
encryptionKeys,
rpc,
localPeers,
}) {
this.#deviceId = getDeviceId(keyManager)
this.#projectId = projectKeyToId(projectKey)
Expand Down Expand Up @@ -237,7 +238,7 @@ export class MapeoProject {
// @ts-expect-error
encryptionKeys,
projectKey,
rpc,
rpc: localPeers,
dataTypes: {
deviceInfo: this.#dataTypes.deviceInfo,
project: this.#dataTypes.projectSettings,
Expand All @@ -249,6 +250,26 @@ export class MapeoProject {
capabilities: this.#capabilities,
})

// Replicate already connected local peers
for (const peer of localPeers.peers) {
if (peer.status !== 'connected') continue
this.#syncController.replicate(peer.protomux)
}

localPeers.on('discovery-key', (discoveryKey, stream) => {
// The core identified by this discovery key might not be part of this
// project, but we can't know that so we will request it from the peer if
// we don't have it. The peer will not return the core key unless it _is_
// part of this project
this.#coreManager.handleDiscoveryKey(discoveryKey, stream)
})

// When a new peer is found, try to replicate (if it is not a member of the
// project it will fail the capability check and be ignored)
localPeers.on('peer-add', (peer) => {
this.#syncController.replicate(peer.protomux)
})

///////// 4. Write core ownership record

const deferred = pDefer()
Expand Down Expand Up @@ -396,11 +417,14 @@ export class MapeoProject {

/**
*
* @param {import('./types.js').ReplicationStream} stream
* @param {Exclude<Parameters<Hypercore.createProtocolStream>[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance
* @returns
*/
[kReplicate](stream) {
return this.#syncController.replicate(stream)
const replicationStream = Hypercore.createProtocolStream(stream, {})
const protomux = replicationStream.noiseStream.userData
// @ts-ignore - got fed up jumping through hoops to keep TS heppy
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol

Suggested change
// @ts-ignore - got fed up jumping through hoops to keep TS heppy
// @ts-expect-error - got fed up jumping through hoops to keep TS happy

return this.#syncController.replicate(protomux)
}

/**
Expand Down
35 changes: 4 additions & 31 deletions src/sync/sync-controller.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import Hypercore from 'hypercore'
import { TypedEmitter } from 'tiny-typed-emitter'
import Protomux from 'protomux'
import { SyncState } from './sync-state.js'
import { PeerSyncController } from './peer-sync-controller.js'

export class SyncController extends TypedEmitter {
#syncState
#coreManager
#capabilities
/** @type {Map<Protomux, PeerSyncController>} */
/** @type {Map<import('protomux'), PeerSyncController>} */
#peerSyncControllers = new Map()

/**
Expand All @@ -30,35 +28,10 @@ export class SyncController extends TypedEmitter {
}

/**
* @param {Exclude<Parameters<Hypercore.createProtocolStream>[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance
* @param {import('protomux')<import('@hyperswarm/secret-stream')>} protomux A protomux instance
*/
replicate(stream) {
if (
Protomux.isProtomux(stream) ||
('userData' in stream && Protomux.isProtomux(stream.userData)) ||
('noiseStream' in stream &&
Protomux.isProtomux(stream.noiseStream.userData))
) {
console.warn(
'Passed an existing protocol stream to syncController.replicate(). Currently any pairing for the `hypercore/alpha` protocol is overwritten'
)
}
const protocolStream = Hypercore.createProtocolStream(stream, {
ondiscoverykey: /** @param {Buffer} discoveryKey */ (discoveryKey) => {
return this.#coreManager.handleDiscoveryKey(discoveryKey, stream)
},
})
const protomux =
// Need to coerce this until we update Hypercore.createProtocolStream types
/** @type {import('protomux')<import('@hyperswarm/secret-stream')>} */ (
protocolStream.noiseStream.userData
)
if (!protomux) throw new Error('Invalid stream')

if (this.#peerSyncControllers.has(protomux)) {
console.warn('Already replicating to this stream')
return
}
replicate(protomux) {
if (this.#peerSyncControllers.has(protomux)) return

const peerSyncController = new PeerSyncController({
protomux,
Expand Down
2 changes: 1 addition & 1 deletion test-types/data-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const mapeoProject = new MapeoProject({
tables: [projectSettingsTable],
sqlite,
}),
rpc: new LocalPeers(),
localPeers: new LocalPeers(),
})

///// Observations
Expand Down
Loading