Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add $sync API methods #361

Merged
merged 15 commits into from
Nov 9, 2023
12 changes: 8 additions & 4 deletions src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const CLIENT_SQLITE_FILE_NAME = 'client.db'
const MAX_FILE_DESCRIPTORS = 768

export const kRPC = Symbol('rpc')
export const kManagerReplicate = Symbol('replicate manager')

/**
* @typedef {Omit<import('./local-peers.js').PeerInfo, 'protomux'>} PublicPeerInfo
Expand Down Expand Up @@ -129,7 +130,7 @@ export class MapeoManager extends TypedEmitter {
this.#localDiscovery = new LocalDiscovery({
identityKeypair: this.#keyManager.getIdentityKeypair(),
})
this.#localDiscovery.on('connection', this.replicate.bind(this))
this.#localDiscovery.on('connection', this[kManagerReplicate].bind(this))
}

/**
Expand All @@ -140,12 +141,15 @@ export class MapeoManager extends TypedEmitter {
}

/**
* Replicate Mapeo to a `@hyperswarm/secret-stream`. Should only be used for
* local (trusted) connections, because the RPC channel key is public
* Replicate Mapeo to a `@hyperswarm/secret-stream`. This replication connects
* the Mapeo RPC channel and allows invites. All active projects will sync
* automatically to this replication stream. Only use for local (trusted)
* connections, because the RPC channel key is public. To sync a specific
* project without connecting RPC, use project[kProjectReplication].
*
* @param {import('@hyperswarm/secret-stream')<any>} noiseStream
*/
replicate(noiseStream) {
[kManagerReplicate](noiseStream) {
const replicationStream = this.#localPeers.connect(noiseStream)
Promise.all([this.getDeviceInfo(), openedNoiseSecretStream(noiseStream)])
.then(([{ name }, openedNoiseStream]) => {
Expand Down
35 changes: 25 additions & 10 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
import { Capabilities } from './capabilities.js'
import { getDeviceId, projectKeyToId, valueOf } from './utils.js'
import { MemberApi } from './member-api.js'
import { SyncController } from './sync/sync-controller.js'
import { SyncApi, kSyncReplicate } from './sync/sync-api.js'
import Hypercore from 'hypercore'

/** @typedef {Omit<import('@mapeo/schema').ProjectSettingsValue, 'schemaName'>} EditableProjectSettings */
Expand All @@ -42,7 +42,7 @@ const INDEXER_STORAGE_FOLDER_NAME = 'indexer'
export const kCoreOwnership = Symbol('coreOwnership')
export const kCapabilities = Symbol('capabilities')
export const kSetOwnDeviceInfo = Symbol('kSetOwnDeviceInfo')
export const kReplicate = Symbol('replicate')
export const kProjectReplicate = Symbol('replicate project')

export class MapeoProject {
#projectId
Expand All @@ -56,7 +56,7 @@ export class MapeoProject {
#capabilities
#ownershipWriteDone
#memberApi
#syncController
#syncApi

/**
* @param {Object} opts
Expand Down Expand Up @@ -245,15 +245,17 @@ export class MapeoProject {
},
})

this.#syncController = new SyncController({
this.#syncApi = new SyncApi({
coreManager: this.#coreManager,
capabilities: this.#capabilities,
})

///////// 4. Wire up sync

// Replicate already connected local peers
for (const peer of localPeers.peers) {
if (peer.status !== 'connected') continue
this.#syncController.replicate(peer.protomux)
this.#syncApi[kSyncReplicate](peer.protomux)
}

localPeers.on('discovery-key', (discoveryKey, stream) => {
Expand All @@ -267,10 +269,10 @@ export class MapeoProject {
// When a new peer is found, try to replicate (if it is not a member of the
// project it will fail the capability check and be ignored)
localPeers.on('peer-add', (peer) => {
this.#syncController.replicate(peer.protomux)
this.#syncApi[kSyncReplicate](peer.protomux)
})

///////// 4. Write core ownership record
///////// 5. Write core ownership record

const deferred = pDefer()
// Avoid uncaught rejection. If this is rejected then project.ready() will reject
Expand Down Expand Up @@ -365,6 +367,10 @@ export class MapeoProject {
return this.#memberApi
}

get $sync() {
return this.#syncApi
}

/**
* @param {Partial<EditableProjectSettings>} settings
* @returns {Promise<EditableProjectSettings>}
Expand Down Expand Up @@ -416,15 +422,24 @@ export class MapeoProject {
}

/**
* Replicate a project to a @hyperswarm/secret-stream. Invites will not
* function because the RPC channel is not connected for project replication,
* and only this project will replicate (to replicate multiple projects you
* need to replicate the manager instance via manager[kManagerReplicate])
*
* @param {Exclude<Parameters<Hypercore.createProtocolStream>[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance
* @returns
*/
[kReplicate](stream) {
const replicationStream = Hypercore.createProtocolStream(stream, {})
[kProjectReplicate](stream) {
const replicationStream = Hypercore.createProtocolStream(stream, {
ondiscoverykey: async (discoveryKey) => {
this.#coreManager.handleDiscoveryKey(discoveryKey, replicationStream)
},
})
const protomux = replicationStream.noiseStream.userData
// @ts-ignore - got fed up jumping through hoops to keep TS heppy
return this.#syncController.replicate(protomux)
this.#syncApi[kSyncReplicate](protomux)
return replicationStream
}

/**
Expand Down
82 changes: 82 additions & 0 deletions src/sync/sync-api.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { TypedEmitter } from 'tiny-typed-emitter'
import { SyncState } from './sync-state.js'
import { PeerSyncController } from './peer-sync-controller.js'

export const kSyncReplicate = Symbol('replicate sync')

/**
* @typedef {object} SyncEvents
* @property {(syncState: import('./sync-state.js').State) => void} sync-state
*/

/**
* @extends {TypedEmitter<SyncEvents>}
*/
export class SyncApi extends TypedEmitter {
syncState
#coreManager
#capabilities
/** @type {Map<import('protomux'), PeerSyncController>} */
#peerSyncControllers = new Map()
/** @type {Set<'local' | 'remote'>} */
#dataSyncEnabled = new Set()

/**
*
* @param {object} opts
* @param {import('../core-manager/index.js').CoreManager} opts.coreManager
* @param {import("../capabilities.js").Capabilities} opts.capabilities
* @param {number} [opts.throttleMs]
*/
constructor({ coreManager, throttleMs = 200, capabilities }) {
super()
this.#coreManager = coreManager
this.#capabilities = capabilities
this.syncState = new SyncState({ coreManager, throttleMs })
this.syncState.on('state', this.emit.bind(this, 'sync-state'))
}

getState() {
return this.syncState.getState()
}

/**
* Start syncing data cores
*/
start() {
if (this.#dataSyncEnabled.has('local')) return
this.#dataSyncEnabled.add('local')
for (const peerSyncController of this.#peerSyncControllers.values()) {
peerSyncController.enableDataSync()
}
}

/**
* Stop syncing data cores (metadata cores will continue syncing in the background)
*/
stop() {
if (!this.#dataSyncEnabled.has('local')) return
this.#dataSyncEnabled.delete('local')
for (const peerSyncController of this.#peerSyncControllers.values()) {
peerSyncController.disableDataSync()
}
}

/**
* @param {import('protomux')<import('@hyperswarm/secret-stream')>} protomux A protomux instance
*/
[kSyncReplicate](protomux) {
if (this.#peerSyncControllers.has(protomux)) return

const peerSyncController = new PeerSyncController({
protomux,
coreManager: this.#coreManager,
syncState: this.syncState,
capabilities: this.#capabilities,
})
if (this.#dataSyncEnabled.has('local')) {
peerSyncController.enableDataSync()
}
this.#peerSyncControllers.set(protomux, peerSyncController)
}
}
44 changes: 0 additions & 44 deletions src/sync/sync-controller.js

This file was deleted.

Loading