From 0785c5604c95a39bfdd981e0863ccca8ef78224b Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Tue, 24 Oct 2023 17:54:12 +0900 Subject: [PATCH] add sync controller --- src/sync/sync-controller.js | 71 +++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 src/sync/sync-controller.js diff --git a/src/sync/sync-controller.js b/src/sync/sync-controller.js new file mode 100644 index 000000000..f68d2f6e1 --- /dev/null +++ b/src/sync/sync-controller.js @@ -0,0 +1,71 @@ +import Hypercore from 'hypercore' +import { TypedEmitter } from 'tiny-typed-emitter' +import Protomux from 'protomux' +import { SyncState } from './sync-state.js' +import { PeerSyncController } from './peer-sync-controller.js' + +export class SyncController extends TypedEmitter { + #syncState + #coreManager + #capabilities + /** @type {Map} */ + #peerSyncControllers = new Map() + + /** + * + * @param {object} opts + * @param {import('../core-manager/index.js').CoreManager} opts.coreManager + * @param {import("../capabilities.js").Capabilities} opts.capabilities + * @param {number} [opts.throttleMs] + */ + constructor({ coreManager, throttleMs = 200, capabilities }) { + super() + this.#coreManager = coreManager + this.#capabilities = capabilities + this.#syncState = new SyncState({ coreManager, throttleMs }) + } + + getState() { + return this.#syncState.getState() + } + + /** + * @param {Exclude[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance + */ + replicate(stream) { + if ( + Protomux.isProtomux(stream) || + ('userData' in stream && Protomux.isProtomux(stream.userData)) || + ('noiseStream' in stream && + Protomux.isProtomux(stream.noiseStream.userData)) + ) { + console.warn( + 'Passed an existing protocol stream to syncController.replicate(). Currently any pairing for the `hypercore/alpha` protocol is overwritten' + ) + } + const protocolStream = Hypercore.createProtocolStream(stream, { + ondiscoverykey: /** @param {Buffer} discoveryKey */ (discoveryKey) => { + return this.#coreManager.handleDiscoveryKey(discoveryKey, stream) + }, + }) + const protomux = + // Need to coerce this until we update Hypercore.createProtocolStream types + /** @type {import('protomux')} */ ( + protocolStream.noiseStream.userData + ) + if (!protomux) throw new Error('Invalid stream') + + if (this.#peerSyncControllers.has(protomux)) { + console.warn('Already replicating to this stream') + return + } + + const peerSyncController = new PeerSyncController({ + protomux, + coreManager: this.#coreManager, + syncState: this.#syncState, + capabilities: this.#capabilities, + }) + this.#peerSyncControllers.set(protomux, peerSyncController) + } +}