Skip to content

Commit

Permalink
feat: check capabilities before send keys & haves
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaclennan committed Nov 27, 2023
1 parent 4ed0928 commit da057bd
Show file tree
Hide file tree
Showing 14 changed files with 495 additions and 639 deletions.
8 changes: 1 addition & 7 deletions proto/extensions.proto
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
syntax = "proto3";

message ProjectExtension {
repeated bytes wantCoreKeys = 1;

repeated bytes authCoreKeys = 2;
repeated bytes configCoreKeys = 3;
repeated bytes dataCoreKeys = 4;
repeated bytes blobIndexCoreKeys = 5;
repeated bytes blobCoreKeys = 6;
repeated bytes authCoreKeys = 1;
}

message HaveExtension {
Expand Down
228 changes: 46 additions & 182 deletions src/core-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ import { HaveExtension, ProjectExtension } from '../generated/extensions.js'
import { CoreIndex } from './core-index.js'
import * as rle from './bitfield-rle.js'
import { Logger } from '../logger.js'
import { keyToId } from '../utils.js'
import { discoveryKey } from 'hypercore-crypto'
import Hypercore from 'hypercore'

export const kCoreManagerReplicate = Symbol('replicate core manager')
// WARNING: Changing these will break things for existing apps, since namespaces
Expand Down Expand Up @@ -55,12 +52,6 @@ export class CoreManager extends TypedEmitter {
#haveExtension
#deviceId
#l
/**
* We use this to reduce network traffic caused by requesting the same key
* from multiple clients.
* TODO: Remove items from this set after a max age
*/
#keyRequests = new TrackedKeyRequests()
#autoDownload

static get namespaces() {
Expand Down Expand Up @@ -155,8 +146,8 @@ export class CoreManager extends TypedEmitter {
'mapeo/project',
{
encoding: ProjectExtensionCodec,
onmessage: (msg, peer) => {
this.#handleProjectMessage(msg, peer)
onmessage: (msg) => {
this.#handleProjectMessage(msg)
},
}
)
Expand All @@ -168,16 +159,6 @@ export class CoreManager extends TypedEmitter {
},
})

this.#creatorCore.on('peer-add', (peer) => {
this.#sendHaves(peer)
})
this.#creatorCore.on('peer-remove', (peer) => {
// When a peer is removed we clean up any unanswered key requests, so that
// we will request from a different peer, and to avoid the tracking of key
// requests growing without bounds.
this.#keyRequests.deleteByPeerKey(peer.remotePublicKey)
})

this.#ready = Promise.all(
[...this.#coreIndex].map(({ core }) => core.ready())
)
Expand Down Expand Up @@ -253,7 +234,6 @@ export class CoreManager extends TypedEmitter {
*/
async close() {
this.#state = 'closing'
this.#keyRequests.clear()
const promises = []
for (const { core } of this.#coreIndex) {
promises.push(core.close())
Expand Down Expand Up @@ -342,69 +322,51 @@ export class CoreManager extends TypedEmitter {
}

/**
* Send an extension message over the project creator core replication stream
* requesting a core key for the given discovery key.
*
* @param {Buffer} peerKey
* @param {Buffer} discoveryKey
* @param {ProjectExtension} msg
*/
requestCoreKey(peerKey, discoveryKey) {
// No-op if we already have this core
if (this.getCoreByDiscoveryKey(discoveryKey)) return
const peer = this.#creatorCore.peers.find((peer) => {
return peer.remotePublicKey.equals(peerKey)
})
if (!peer) {
// This should not happen because this is only called from SyncApi, which
// checks the peer exists before calling this method.
this.#l.log(
'Attempted to request core key for %h, but no connected peer %h',
discoveryKey,
peerKey
)
return
#handleProjectMessage({ authCoreKeys }) {
for (const coreKey of authCoreKeys) {
// Use public method - these must be persisted (private method defaults to persisted=false)
this.addCore(coreKey, 'auth')
}
// Only request a key once, e.g. from the peer we first receive it from (we
// can assume that a peer must have the key if we see the discovery key in
// the protomux). This is necessary to reduce network traffic for many newly
// connected peers - otherwise duplicate requests will be sent to every peer
if (this.#keyRequests.has(discoveryKey)) return
this.#keyRequests.set(discoveryKey, peerKey)
}

this.#l.log(
'Requesting core key for discovery key %h from peer %h',
discoveryKey,
peerKey
)
const message = ProjectExtension.fromPartial({
wantCoreKeys: [discoveryKey],
})
this.#projectExtension.send(message, peer)
/**
* Sends auth core keys to the given peer, skipping any keys that we know the
* peer has already (depends on the peer having already replicated the auth
* cores it has)
*
* @param {any} peer
*/
sendAuthCoreKeys(peer) {
this.#sendCoreKeys(peer, ['auth'])
}

/**
* @param {ProjectExtension} msg
* We only send non-auth core keys to a peer for unit tests
* @param {any} peer
* @param {Readonly<Namespace[]>} namespaces
*/
#handleProjectMessage({ wantCoreKeys, ...coreKeys }, peer) {
#sendCoreKeys(peer, namespaces) {
const message = ProjectExtension.create()
let hasKeys = false
for (const discoveryKey of wantCoreKeys) {
const coreRecord = this.getCoreByDiscoveryKey(discoveryKey)
if (!coreRecord) continue
message[`${coreRecord.namespace}CoreKeys`].push(coreRecord.key)
hasKeys = true
for (const ns of namespaces) {
for (const { core, key } of this.getCores(ns)) {
let peerHasKeyAlready = false
for (const peer of core.peers) {
if (peer.remotePublicKey.equals(peer.remotePublicKey)) {
peerHasKeyAlready = true
break
}
}
if (peerHasKeyAlready) continue
message.authCoreKeys.push(key)
hasKeys = true
}
}
if (hasKeys) {
this.#projectExtension.send(message, peer)
}
for (const namespace of NAMESPACES) {
for (const coreKey of coreKeys[`${namespace}CoreKeys`]) {
// Use public method - these must be persisted (private method defaults to persisted=false)
this.addCore(coreKey, namespace)
this.#keyRequests.deleteByDiscoveryKey(discoveryKey(coreKey))
}
}
}

/**
Expand All @@ -426,21 +388,17 @@ export class CoreManager extends TypedEmitter {
}

/**
*
* @param {any} peer
* @param {Exclude<Namespace, 'auth'>} namespace
*/
async #sendHaves(peer) {
if (!peer) {
console.warn('sendHaves no peer', peer.remotePublicKey)
// TODO: How to handle this and when does it happen?
return
}
async sendHaves(peer, namespace) {
// We want ready() rather than update() because we are only interested in
// local data. This waits for all cores to be ready.
await this.ready()

peer.protomux.cork()

for (const { core, namespace } of this.#coreIndex) {
// We want ready() rather than update() because we are only interested in local data
await core.ready()
for (const { core } of this.getCores(namespace)) {
if (core.length === 0) continue
const { discoveryKey } = core
// This will always be defined after ready(), but need to let TS know
Expand All @@ -465,18 +423,14 @@ export class CoreManager extends TypedEmitter {
* @returns
*/
[kCoreManagerReplicate](stream) {
const protocolStream = Hypercore.createProtocolStream(stream, {
ondiscoverykey: async (discoveryKey) => {
const peer = await findPeer(
this.creatorCore,
// @ts-ignore
protocolStream.noiseStream.remotePublicKey
)
if (!peer) return
this.requestCoreKey(peer.remotePublicKey, discoveryKey)
},
const protocolStream = this.#corestore.replicate(stream)
this.#creatorCore.on('peer-add', (peer) => {
// Normally only auth core keys are sent, but for unit tests we need to
// send all of them, because unit tests don't include the Sync API which
// adds cores from core ownership records.
this.#sendCoreKeys(peer, NAMESPACES)
})
return this.#corestore.replicate(stream)
return protocolStream
}
}

Expand Down Expand Up @@ -523,93 +477,3 @@ const HaveExtensionCodec = {
}
},
}

class TrackedKeyRequests {
/** @type {Map<string, string>} */
#byDiscoveryId = new Map()
/** @type {Map<string, Set<string>>} */
#byPeerId = new Map()

/**
* @param {Buffer} discoveryKey
* @param {Buffer} peerKey
*/
set(discoveryKey, peerKey) {
const discoveryId = keyToId(discoveryKey)
const peerId = keyToId(peerKey)
const existingForPeer = this.#byPeerId.get(peerId) || new Set()
this.#byDiscoveryId.set(discoveryId, peerId)
existingForPeer.add(discoveryId)
this.#byPeerId.set(peerId, existingForPeer)
return this
}
/**
* @param {Buffer} discoveryKey
*/
has(discoveryKey) {
const discoveryId = keyToId(discoveryKey)
return this.#byDiscoveryId.has(discoveryId)
}
/**
* @param {Buffer} discoveryKey
*/
deleteByDiscoveryKey(discoveryKey) {
const discoveryId = keyToId(discoveryKey)
const peerId = this.#byDiscoveryId.get(discoveryId)
if (!peerId) return false
this.#byDiscoveryId.delete(discoveryId)
const existingForPeer = this.#byPeerId.get(peerId)
if (existingForPeer) {
existingForPeer.delete(discoveryId)
}
return true
}
/**
* @param {Buffer} peerKey
*/
deleteByPeerKey(peerKey) {
const peerId = keyToId(peerKey)
const existingForPeer = this.#byPeerId.get(peerId)
if (!existingForPeer) return
for (const discoveryId of existingForPeer) {
this.#byDiscoveryId.delete(discoveryId)
}
this.#byPeerId.delete(peerId)
}
clear() {
this.#byDiscoveryId.clear()
this.#byPeerId.clear()
}
}

/**
* @param {Hypercore<"binary", Buffer>} core
* @param {Buffer} publicKey
* @param {{ timeout?: number }} [opts]
*/
function findPeer(core, publicKey, { timeout = 200 } = {}) {
const peer = core.peers.find((peer) => {
return peer.remotePublicKey.equals(publicKey)
})
if (peer) return peer
// This is called from the from the handleDiscoveryId event, which can
// happen before the peer connection is fully established, so we wait for
// the `peer-add` event, with a timeout in case the peer never gets added
return new Promise(function (res) {
const timeoutId = setTimeout(function () {
core.off('peer-add', onPeer)
res(null)
}, timeout)

core.on('peer-add', onPeer)

/** @param {any} peer */
function onPeer(peer) {
if (peer.remotePublicKey.equals(publicKey)) {
clearTimeout(timeoutId)
core.off('peer-add', onPeer)
res(peer)
}
}
})
}
12 changes: 9 additions & 3 deletions src/core-ownership.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,23 @@ export class CoreOwnership {
}

/**
*
* @param {string} deviceId
* @param {typeof NAMESPACES[number]} namespace
* @returns {Promise<string>} coreId of core belonging to `deviceId` for `namespace`
*/
async getCoreId(deviceId, namespace) {
await this.#ownershipWriteDone
const result = await this.#dataType.getByDocId(deviceId)
const result = await this.get(deviceId)
return result[`${namespace}CoreId`]
}

/**
* @param {string} deviceId
*/
async get(deviceId) {
await this.#ownershipWriteDone
return this.#dataType.getByDocId(deviceId)
}

/**
*
* @param {import('./types.js').KeyPair} identityKeypair
Expand Down
5 changes: 1 addition & 4 deletions src/datastore/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { TypedEmitter } from 'tiny-typed-emitter'
import { encode, decode, getVersionId, parseVersionId } from '@mapeo/schema'
import MultiCoreIndexer from 'multi-core-indexer'
import pDefer from 'p-defer'
Expand Down Expand Up @@ -30,9 +29,8 @@ const NAMESPACE_SCHEMAS = /** @type {const} */ ({
/**
* @template {keyof NamespaceSchemas} [TNamespace=keyof NamespaceSchemas]
* @template {NamespaceSchemas[TNamespace][number]} [TSchemaName=NamespaceSchemas[TNamespace][number]]
* @extends {TypedEmitter}
*/
export class DataStore extends TypedEmitter {
export class DataStore {
#coreManager
#namespace
#batch
Expand All @@ -51,7 +49,6 @@ export class DataStore extends TypedEmitter {
* @param {MultiCoreIndexer.StorageParam} opts.storage
*/
constructor({ coreManager, namespace, batch, storage }) {
super()
this.#coreManager = coreManager
this.#namespace = namespace
this.#batch = batch
Expand Down
Loading

0 comments on commit da057bd

Please sign in to comment.