From a32cab03c0111e41db44b89abfc8776657dd1b9e Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 9 Nov 2023 15:52:37 +0900 Subject: [PATCH 1/4] feat: integrate LocalDiscovery & LocalPeers (#358) --- src/local-peers.js | 21 ++++++++---- src/mapeo-manager.js | 67 +++++++++++++++++++++++++++---------- src/mapeo-project.js | 34 ++++++++++++++++--- src/sync/sync-controller.js | 35 +++---------------- test-types/data-types.ts | 2 +- 5 files changed, 98 insertions(+), 61 deletions(-) diff --git a/src/local-peers.js b/src/local-peers.js index 5989a5bd8..32c0d2305 100644 --- a/src/local-peers.js +++ b/src/local-peers.js @@ -30,7 +30,7 @@ const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)]) * @property {string | undefined} name */ /** @typedef {PeerInfoBase & { status: 'connecting' }} PeerInfoConnecting */ -/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux }} PeerInfoConnected */ +/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux }} PeerInfoConnected */ /** @typedef {PeerInfoBase & { status: 'disconnected', disconnectedAt: number }} PeerInfoDisconnected */ /** @typedef {PeerInfoConnecting | PeerInfoConnected | PeerInfoDisconnected} PeerInfoInternal */ @@ -57,7 +57,7 @@ class Peer { #name #connectedAt = 0 #disconnectedAt = 0 - /** @type {Protomux} */ + /** @type {Protomux} */ #protomux /** @@ -103,7 +103,7 @@ class Peer { } } } - /** @param {Protomux} protomux */ + /** @param {Protomux} protomux */ connect(protomux) { this.#protomux = protomux /* c8 ignore next 3 */ @@ -166,7 +166,9 @@ class Peer { /** * @typedef {object} LocalPeersEvents * @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status + * @property {(peer: PeerInfoConnected) => void} peer-add Emitted when a new peer is connected * @property {(peerId: string, invite: InviteWithKeys) => void} invite Emitted when an invite is received + * @property {(discoveryKey: Buffer, stream: import('./types.js').ReplicationStream) => void} discovery-key Emitted when a new hypercore is replicated (by a peer) to a peer replication stream (passed as the second parameter) */ /** @extends {TypedEmitter} */ @@ -272,6 +274,13 @@ export class LocalPeers extends TypedEmitter { stream.userData = protomux this.#opening.add(stream.opened) + protomux.pair( + { protocol: 'hypercore/alpha' }, + /** @param {Buffer} discoveryKey */ async (discoveryKey) => { + this.emit('discovery-key', discoveryKey, stream.rawStream) + } + ) + // No need to connect error handler to stream because Protomux does this, // and errors are eventually handled by #closePeer @@ -319,16 +328,16 @@ export class LocalPeers extends TypedEmitter { /** * @param {Buffer} publicKey - * @param {Protomux} protomux + * @param {Protomux} protomux */ #openPeer(publicKey, protomux) { const peerId = keyToId(publicKey) const peer = this.#peers.get(peerId) /* c8 ignore next */ if (!peer) return // TODO: report error - this should not happen - const wasConnected = peer.info.status === 'connected' peer.connect(protomux) - if (!wasConnected) this.#emitPeers() + this.#emitPeers() + this.emit('peer-add', /** @type {PeerInfoConnected} */ (peer.info)) } /** @param {Buffer} publicKey */ diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index 32965a237..e37110e2e 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -17,6 +17,8 @@ import { ProjectKeys } from './generated/keys.js' import { deNullify, getDeviceId, + keyToId, + openedNoiseSecretStream, projectIdToNonce, projectKeyToId, projectKeyToPublicId, @@ -24,6 +26,7 @@ import { import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js' import { LocalPeers } from './local-peers.js' import { InviteApi } from './invite-api.js' +import { LocalDiscovery } from './discovery/local-discovery.js' /** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */ @@ -48,8 +51,9 @@ export class MapeoManager { #coreStorage #dbFolder #deviceId - #rpc + #localPeers #invite + #localDiscovery /** * @param {Object} opts @@ -69,7 +73,7 @@ export class MapeoManager { migrationsFolder: new URL('../drizzle/client', import.meta.url).pathname, }) - this.#rpc = new LocalPeers() + this.#localPeers = new LocalPeers() this.#keyManager = new KeyManager(rootKey) this.#deviceId = getDeviceId(this.#keyManager) this.#projectSettingsIndexWriter = new IndexWriter({ @@ -79,7 +83,7 @@ export class MapeoManager { this.#activeProjects = new Map() this.#invite = new InviteApi({ - rpc: this.#rpc, + rpc: this.#localPeers, queries: { isMember: async (projectId) => { const projectExists = this.#db @@ -99,17 +103,43 @@ export class MapeoManager { if (typeof coreStorage === 'string') { const pool = new RandomAccessFilePool(MAX_FILE_DESCRIPTORS) // @ts-ignore - this.#coreStorage = Hypercore.createStorage(coreStorage, { pool }) + this.#coreStorage = Hypercore.defaultStorage(coreStorage, { pool }) } else { this.#coreStorage = coreStorage } + + this.#localDiscovery = new LocalDiscovery({ + identityKeypair: this.#keyManager.getIdentityKeypair(), + }) + this.#localDiscovery.on('connection', this.replicate.bind(this)) } /** * MapeoRPC instance, used for tests */ get [kRPC]() { - return this.#rpc + return this.#localPeers + } + + /** + * Replicate Mapeo to a `@hyperswarm/secret-stream`. Should only be used for + * local (trusted) connections, because the RPC channel key is public + * + * @param {import('@hyperswarm/secret-stream')} noiseStream + */ + replicate(noiseStream) { + const replicationStream = this.#localPeers.connect(noiseStream) + Promise.all([this.getDeviceInfo(), openedNoiseSecretStream(noiseStream)]) + .then(([{ name }, openedNoiseStream]) => { + if (openedNoiseStream.destroyed || !name) return + const peerId = keyToId(openedNoiseStream.remotePublicKey) + return this.#localPeers.sendDeviceInfo(peerId, { name }) + }) + .catch((e) => { + // Ignore error but log + console.error('Failed to send device info to peer', e) + }) + return replicationStream } /** @@ -205,15 +235,10 @@ export class MapeoManager { }) // 4. Create MapeoProject instance - const project = new MapeoProject({ - ...this.#projectStorage(projectId), + const project = this.#createProjectInstance({ encryptionKeys, - keyManager: this.#keyManager, projectKey: projectKeypair.publicKey, projectSecretKey: projectKeypair.secretKey, - sharedDb: this.#db, - sharedIndexWriter: this.#projectSettingsIndexWriter, - rpc: this.#rpc, }) // 5. Write project name and any other relevant metadata to project instance @@ -263,19 +288,25 @@ export class MapeoManager { projectId ) - const project = new MapeoProject({ + const project = this.#createProjectInstance(projectKeys) + + // 3. Keep track of project instance as we know it's a properly existing project + this.#activeProjects.set(projectPublicId, project) + + return project + } + + /** @param {ProjectKeys} projectKeys */ + #createProjectInstance(projectKeys) { + const projectId = keyToId(projectKeys.projectKey) + return new MapeoProject({ ...this.#projectStorage(projectId), ...projectKeys, keyManager: this.#keyManager, sharedDb: this.#db, sharedIndexWriter: this.#projectSettingsIndexWriter, - rpc: this.#rpc, + localPeers: this.#localPeers, }) - - // 3. Keep track of project instance as we know it's a properly existing project - this.#activeProjects.set(projectPublicId, project) - - return project } /** diff --git a/src/mapeo-project.js b/src/mapeo-project.js index ed24cbc50..cf9805174 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -33,6 +33,7 @@ import { Capabilities } from './capabilities.js' import { getDeviceId, projectKeyToId, valueOf } from './utils.js' import { MemberApi } from './member-api.js' import { SyncController } from './sync/sync-controller.js' +import Hypercore from 'hypercore' /** @typedef {Omit} EditableProjectSettings */ @@ -67,7 +68,7 @@ export class MapeoProject { * @param {import('drizzle-orm/better-sqlite3').BetterSQLite3Database} opts.sharedDb * @param {IndexWriter} opts.sharedIndexWriter * @param {import('./types.js').CoreStorage} opts.coreStorage Folder to store all hypercore data - * @param {import('./local-peers.js').LocalPeers} opts.rpc + * @param {import('./local-peers.js').LocalPeers} opts.localPeers * */ constructor({ @@ -79,7 +80,7 @@ export class MapeoProject { projectKey, projectSecretKey, encryptionKeys, - rpc, + localPeers, }) { this.#deviceId = getDeviceId(keyManager) this.#projectId = projectKeyToId(projectKey) @@ -237,7 +238,7 @@ export class MapeoProject { // @ts-expect-error encryptionKeys, projectKey, - rpc, + rpc: localPeers, dataTypes: { deviceInfo: this.#dataTypes.deviceInfo, project: this.#dataTypes.projectSettings, @@ -249,6 +250,26 @@ export class MapeoProject { capabilities: this.#capabilities, }) + // Replicate already connected local peers + for (const peer of localPeers.peers) { + if (peer.status !== 'connected') continue + this.#syncController.replicate(peer.protomux) + } + + localPeers.on('discovery-key', (discoveryKey, stream) => { + // The core identified by this discovery key might not be part of this + // project, but we can't know that so we will request it from the peer if + // we don't have it. The peer will not return the core key unless it _is_ + // part of this project + this.#coreManager.handleDiscoveryKey(discoveryKey, stream) + }) + + // When a new peer is found, try to replicate (if it is not a member of the + // project it will fail the capability check and be ignored) + localPeers.on('peer-add', (peer) => { + this.#syncController.replicate(peer.protomux) + }) + ///////// 4. Write core ownership record const deferred = pDefer() @@ -396,11 +417,14 @@ export class MapeoProject { /** * - * @param {import('./types.js').ReplicationStream} stream + * @param {Exclude[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance * @returns */ [kReplicate](stream) { - return this.#syncController.replicate(stream) + const replicationStream = Hypercore.createProtocolStream(stream, {}) + const protomux = replicationStream.noiseStream.userData + // @ts-ignore - got fed up jumping through hoops to keep TS heppy + return this.#syncController.replicate(protomux) } /** diff --git a/src/sync/sync-controller.js b/src/sync/sync-controller.js index f68d2f6e1..a32b6cb95 100644 --- a/src/sync/sync-controller.js +++ b/src/sync/sync-controller.js @@ -1,6 +1,4 @@ -import Hypercore from 'hypercore' import { TypedEmitter } from 'tiny-typed-emitter' -import Protomux from 'protomux' import { SyncState } from './sync-state.js' import { PeerSyncController } from './peer-sync-controller.js' @@ -8,7 +6,7 @@ export class SyncController extends TypedEmitter { #syncState #coreManager #capabilities - /** @type {Map} */ + /** @type {Map} */ #peerSyncControllers = new Map() /** @@ -30,35 +28,10 @@ export class SyncController extends TypedEmitter { } /** - * @param {Exclude[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance + * @param {import('protomux')} protomux A protomux instance */ - replicate(stream) { - if ( - Protomux.isProtomux(stream) || - ('userData' in stream && Protomux.isProtomux(stream.userData)) || - ('noiseStream' in stream && - Protomux.isProtomux(stream.noiseStream.userData)) - ) { - console.warn( - 'Passed an existing protocol stream to syncController.replicate(). Currently any pairing for the `hypercore/alpha` protocol is overwritten' - ) - } - const protocolStream = Hypercore.createProtocolStream(stream, { - ondiscoverykey: /** @param {Buffer} discoveryKey */ (discoveryKey) => { - return this.#coreManager.handleDiscoveryKey(discoveryKey, stream) - }, - }) - const protomux = - // Need to coerce this until we update Hypercore.createProtocolStream types - /** @type {import('protomux')} */ ( - protocolStream.noiseStream.userData - ) - if (!protomux) throw new Error('Invalid stream') - - if (this.#peerSyncControllers.has(protomux)) { - console.warn('Already replicating to this stream') - return - } + replicate(protomux) { + if (this.#peerSyncControllers.has(protomux)) return const peerSyncController = new PeerSyncController({ protomux, diff --git a/test-types/data-types.ts b/test-types/data-types.ts index 10b2fcb15..aa24bed80 100644 --- a/test-types/data-types.ts +++ b/test-types/data-types.ts @@ -36,7 +36,7 @@ const mapeoProject = new MapeoProject({ tables: [projectSettingsTable], sqlite, }), - rpc: new LocalPeers(), + localPeers: new LocalPeers(), }) ///// Observations From ab77e5119b2a5904d865f38cec58e1722c7061a3 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 9 Nov 2023 15:58:59 +0900 Subject: [PATCH 2/4] feat: `listLocalPeers()` & `local-peers` event (#360) * WIP initial work * rename Rpc to LocalPeers * Handle deviceInfo internally, id -> deviceId * Tests for stream error handling * remove unnecessary constructor * return replication stream * Attach protomux instance to peer info * rename and re-organize * revert changes outside scope of PR * WIP initial work * Tie everything together * rename getProjectInstance * feat: client.listLocalPeers() & `local-peers` evt --- src/mapeo-manager.js | 52 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index e37110e2e..999226620 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -27,6 +27,7 @@ import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js' import { LocalPeers } from './local-peers.js' import { InviteApi } from './invite-api.js' import { LocalDiscovery } from './discovery/local-discovery.js' +import { TypedEmitter } from 'tiny-typed-emitter' /** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */ @@ -40,7 +41,19 @@ const MAX_FILE_DESCRIPTORS = 768 export const kRPC = Symbol('rpc') -export class MapeoManager { +/** + * @typedef {Omit} PublicPeerInfo + */ + +/** + * @typedef {object} MapeoManagerEvents + * @property {(peers: PublicPeerInfo[]) => void} local-peers Emitted when the list of connected peers changes (new ones added, or connection status changes) + */ + +/** + * @extends {TypedEmitter} + */ +export class MapeoManager extends TypedEmitter { #keyManager #projectSettingsIndexWriter #db @@ -62,6 +75,7 @@ export class MapeoManager { * @param {string | import('./types.js').CoreStorage} opts.coreStorage Folder for hypercore storage or a function that returns a RandomAccessStorage instance */ constructor({ rootKey, dbFolder, coreStorage }) { + super() this.#dbFolder = dbFolder const sqlite = new Database( dbFolder === ':memory:' @@ -74,6 +88,10 @@ export class MapeoManager { }) this.#localPeers = new LocalPeers() + this.#localPeers.on('peers', (peers) => { + this.emit('local-peers', omitPeerProtomux(peers)) + }) + this.#keyManager = new KeyManager(rootKey) this.#deviceId = getDeviceId(this.#keyManager) this.#projectSettingsIndexWriter = new IndexWriter({ @@ -456,4 +474,36 @@ export class MapeoManager { get invite() { return this.#invite } + + /** + * @returns {Promise} + */ + async listLocalPeers() { + return omitPeerProtomux(this.#localPeers.peers) + } +} + +// We use the `protomux` property of connected peers internally, but we don't +// expose it to the API. I have avoided using a private symbol for this for fear +// that we could accidentally keep references around of protomux instances, +// which could cause a memory leak (it shouldn't, but just to eliminate the +// possibility) + +/** + * Remove the protomux property of connected peers + * + * @param {import('./local-peers.js').PeerInfo[]} peers + * @returns {PublicPeerInfo[]} + */ +function omitPeerProtomux(peers) { + return peers.map( + ({ + // @ts-ignore + // eslint-disable-next-line no-unused-vars + protomux, + ...publicPeerInfo + }) => { + return publicPeerInfo + } + ) } From ca95b2f1105d850c483a9da69a4ed0e0c060ae87 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 9 Nov 2023 16:10:05 +0900 Subject: [PATCH 3/4] feat: add `$sync` API methods (#361) * WIP initial work * rename Rpc to LocalPeers * Handle deviceInfo internally, id -> deviceId * Tests for stream error handling * remove unnecessary constructor * return replication stream * Attach protomux instance to peer info * rename and re-organize * revert changes outside scope of PR * WIP initial work * Tie everything together * rename getProjectInstance * feat: client.listLocalPeers() & `local-peers` evt * feat: add $sync API methods For now this simplifies the API (because we are only supporting local sync, not remote sync over the internet) to: - `project.$sync.getState()` - `project.$sync.start()` - `project.$sync.stop()` - Events - `sync-state` It's currently not possible to stop local discovery, nor is it possible to stop sync of the metadata namespaces (auth, config, blobIndex). The start and stop methods stop the sync of the data and blob namespaces. Fixes #134. Stacked on #360, #358 and #356. --- src/mapeo-manager.js | 12 ++++-- src/mapeo-project.js | 35 +++++++++++----- src/sync/sync-api.js | 82 +++++++++++++++++++++++++++++++++++++ src/sync/sync-controller.js | 44 -------------------- 4 files changed, 115 insertions(+), 58 deletions(-) create mode 100644 src/sync/sync-api.js delete mode 100644 src/sync/sync-controller.js diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index 999226620..2ac56b057 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -40,6 +40,7 @@ const CLIENT_SQLITE_FILE_NAME = 'client.db' const MAX_FILE_DESCRIPTORS = 768 export const kRPC = Symbol('rpc') +export const kManagerReplicate = Symbol('replicate manager') /** * @typedef {Omit} PublicPeerInfo @@ -129,7 +130,7 @@ export class MapeoManager extends TypedEmitter { this.#localDiscovery = new LocalDiscovery({ identityKeypair: this.#keyManager.getIdentityKeypair(), }) - this.#localDiscovery.on('connection', this.replicate.bind(this)) + this.#localDiscovery.on('connection', this[kManagerReplicate].bind(this)) } /** @@ -140,12 +141,15 @@ export class MapeoManager extends TypedEmitter { } /** - * Replicate Mapeo to a `@hyperswarm/secret-stream`. Should only be used for - * local (trusted) connections, because the RPC channel key is public + * Replicate Mapeo to a `@hyperswarm/secret-stream`. This replication connects + * the Mapeo RPC channel and allows invites. All active projects will sync + * automatically to this replication stream. Only use for local (trusted) + * connections, because the RPC channel key is public. To sync a specific + * project without connecting RPC, use project[kProjectReplication]. * * @param {import('@hyperswarm/secret-stream')} noiseStream */ - replicate(noiseStream) { + [kManagerReplicate](noiseStream) { const replicationStream = this.#localPeers.connect(noiseStream) Promise.all([this.getDeviceInfo(), openedNoiseSecretStream(noiseStream)]) .then(([{ name }, openedNoiseStream]) => { diff --git a/src/mapeo-project.js b/src/mapeo-project.js index cf9805174..669922966 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -32,7 +32,7 @@ import { import { Capabilities } from './capabilities.js' import { getDeviceId, projectKeyToId, valueOf } from './utils.js' import { MemberApi } from './member-api.js' -import { SyncController } from './sync/sync-controller.js' +import { SyncApi, kSyncReplicate } from './sync/sync-api.js' import Hypercore from 'hypercore' /** @typedef {Omit} EditableProjectSettings */ @@ -42,7 +42,7 @@ const INDEXER_STORAGE_FOLDER_NAME = 'indexer' export const kCoreOwnership = Symbol('coreOwnership') export const kCapabilities = Symbol('capabilities') export const kSetOwnDeviceInfo = Symbol('kSetOwnDeviceInfo') -export const kReplicate = Symbol('replicate') +export const kProjectReplicate = Symbol('replicate project') export class MapeoProject { #projectId @@ -56,7 +56,7 @@ export class MapeoProject { #capabilities #ownershipWriteDone #memberApi - #syncController + #syncApi /** * @param {Object} opts @@ -245,15 +245,17 @@ export class MapeoProject { }, }) - this.#syncController = new SyncController({ + this.#syncApi = new SyncApi({ coreManager: this.#coreManager, capabilities: this.#capabilities, }) + ///////// 4. Wire up sync + // Replicate already connected local peers for (const peer of localPeers.peers) { if (peer.status !== 'connected') continue - this.#syncController.replicate(peer.protomux) + this.#syncApi[kSyncReplicate](peer.protomux) } localPeers.on('discovery-key', (discoveryKey, stream) => { @@ -267,10 +269,10 @@ export class MapeoProject { // When a new peer is found, try to replicate (if it is not a member of the // project it will fail the capability check and be ignored) localPeers.on('peer-add', (peer) => { - this.#syncController.replicate(peer.protomux) + this.#syncApi[kSyncReplicate](peer.protomux) }) - ///////// 4. Write core ownership record + ///////// 5. Write core ownership record const deferred = pDefer() // Avoid uncaught rejection. If this is rejected then project.ready() will reject @@ -365,6 +367,10 @@ export class MapeoProject { return this.#memberApi } + get $sync() { + return this.#syncApi + } + /** * @param {Partial} settings * @returns {Promise} @@ -416,15 +422,24 @@ export class MapeoProject { } /** + * Replicate a project to a @hyperswarm/secret-stream. Invites will not + * function because the RPC channel is not connected for project replication, + * and only this project will replicate (to replicate multiple projects you + * need to replicate the manager instance via manager[kManagerReplicate]) * * @param {Exclude[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance * @returns */ - [kReplicate](stream) { - const replicationStream = Hypercore.createProtocolStream(stream, {}) + [kProjectReplicate](stream) { + const replicationStream = Hypercore.createProtocolStream(stream, { + ondiscoverykey: async (discoveryKey) => { + this.#coreManager.handleDiscoveryKey(discoveryKey, replicationStream) + }, + }) const protomux = replicationStream.noiseStream.userData // @ts-ignore - got fed up jumping through hoops to keep TS heppy - return this.#syncController.replicate(protomux) + this.#syncApi[kSyncReplicate](protomux) + return replicationStream } /** diff --git a/src/sync/sync-api.js b/src/sync/sync-api.js new file mode 100644 index 000000000..9508adc13 --- /dev/null +++ b/src/sync/sync-api.js @@ -0,0 +1,82 @@ +import { TypedEmitter } from 'tiny-typed-emitter' +import { SyncState } from './sync-state.js' +import { PeerSyncController } from './peer-sync-controller.js' + +export const kSyncReplicate = Symbol('replicate sync') + +/** + * @typedef {object} SyncEvents + * @property {(syncState: import('./sync-state.js').State) => void} sync-state + */ + +/** + * @extends {TypedEmitter} + */ +export class SyncApi extends TypedEmitter { + syncState + #coreManager + #capabilities + /** @type {Map} */ + #peerSyncControllers = new Map() + /** @type {Set<'local' | 'remote'>} */ + #dataSyncEnabled = new Set() + + /** + * + * @param {object} opts + * @param {import('../core-manager/index.js').CoreManager} opts.coreManager + * @param {import("../capabilities.js").Capabilities} opts.capabilities + * @param {number} [opts.throttleMs] + */ + constructor({ coreManager, throttleMs = 200, capabilities }) { + super() + this.#coreManager = coreManager + this.#capabilities = capabilities + this.syncState = new SyncState({ coreManager, throttleMs }) + this.syncState.on('state', this.emit.bind(this, 'sync-state')) + } + + getState() { + return this.syncState.getState() + } + + /** + * Start syncing data cores + */ + start() { + if (this.#dataSyncEnabled.has('local')) return + this.#dataSyncEnabled.add('local') + for (const peerSyncController of this.#peerSyncControllers.values()) { + peerSyncController.enableDataSync() + } + } + + /** + * Stop syncing data cores (metadata cores will continue syncing in the background) + */ + stop() { + if (!this.#dataSyncEnabled.has('local')) return + this.#dataSyncEnabled.delete('local') + for (const peerSyncController of this.#peerSyncControllers.values()) { + peerSyncController.disableDataSync() + } + } + + /** + * @param {import('protomux')} protomux A protomux instance + */ + [kSyncReplicate](protomux) { + if (this.#peerSyncControllers.has(protomux)) return + + const peerSyncController = new PeerSyncController({ + protomux, + coreManager: this.#coreManager, + syncState: this.syncState, + capabilities: this.#capabilities, + }) + if (this.#dataSyncEnabled.has('local')) { + peerSyncController.enableDataSync() + } + this.#peerSyncControllers.set(protomux, peerSyncController) + } +} diff --git a/src/sync/sync-controller.js b/src/sync/sync-controller.js deleted file mode 100644 index a32b6cb95..000000000 --- a/src/sync/sync-controller.js +++ /dev/null @@ -1,44 +0,0 @@ -import { TypedEmitter } from 'tiny-typed-emitter' -import { SyncState } from './sync-state.js' -import { PeerSyncController } from './peer-sync-controller.js' - -export class SyncController extends TypedEmitter { - #syncState - #coreManager - #capabilities - /** @type {Map} */ - #peerSyncControllers = new Map() - - /** - * - * @param {object} opts - * @param {import('../core-manager/index.js').CoreManager} opts.coreManager - * @param {import("../capabilities.js").Capabilities} opts.capabilities - * @param {number} [opts.throttleMs] - */ - constructor({ coreManager, throttleMs = 200, capabilities }) { - super() - this.#coreManager = coreManager - this.#capabilities = capabilities - this.#syncState = new SyncState({ coreManager, throttleMs }) - } - - getState() { - return this.#syncState.getState() - } - - /** - * @param {import('protomux')} protomux A protomux instance - */ - replicate(protomux) { - if (this.#peerSyncControllers.has(protomux)) return - - const peerSyncController = new PeerSyncController({ - protomux, - coreManager: this.#coreManager, - syncState: this.#syncState, - capabilities: this.#capabilities, - }) - this.#peerSyncControllers.set(protomux, peerSyncController) - } -} From 75ea34aa61d159f4a3d23afe182134e918cd603a Mon Sep 17 00:00:00 2001 From: Andrew Chou Date: Thu, 9 Nov 2023 02:32:49 -0500 Subject: [PATCH 4/4] fix: fix core storage initialization in MapeoManager (#367) Co-authored-by: Gregor MacLennan --- src/mapeo-manager.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index 2ac56b057..f7659849d 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -121,7 +121,7 @@ export class MapeoManager extends TypedEmitter { if (typeof coreStorage === 'string') { const pool = new RandomAccessFilePool(MAX_FILE_DESCRIPTORS) - // @ts-ignore + // @ts-expect-error this.#coreStorage = Hypercore.defaultStorage(coreStorage, { pool }) } else { this.#coreStorage = coreStorage