Skip to content

Commit

Permalink
feat: only send/receive core keys with permission (#783)
Browse files Browse the repository at this point in the history
Previously, we would add cores without checking anything.

Now, we only add them once (1) their role is OK (2) they have a good
core ownership record.

Partly addresses [#268].

[#268]: #268

Co-Authored-By: Gregor MacLennan <[email protected]>
  • Loading branch information
EvanHahn and gmaclennan authored Aug 28, 2024
1 parent 4182e9f commit ca9b7d4
Show file tree
Hide file tree
Showing 14 changed files with 212 additions and 611 deletions.
8 changes: 1 addition & 7 deletions proto/extensions.proto
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
syntax = "proto3";

message ProjectExtension {
repeated bytes wantCoreKeys = 1;

repeated bytes authCoreKeys = 2;
repeated bytes configCoreKeys = 3;
repeated bytes dataCoreKeys = 4;
repeated bytes blobIndexCoreKeys = 5;
repeated bytes blobCoreKeys = 6;
repeated bytes authCoreKeys = 1;
}

message HaveExtension {
Expand Down
199 changes: 26 additions & 173 deletions src/core-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ import Corestore from 'corestore'
import { debounce } from 'throttle-debounce'
import assert from 'node:assert/strict'
import { sql, eq } from 'drizzle-orm'
import { discoveryKey } from 'hypercore-crypto'
import Hypercore from 'hypercore'

import { HaveExtension, ProjectExtension } from '../generated/extensions.js'
import { Logger } from '../logger.js'
import { NAMESPACES } from '../constants.js'
import { keyToId, noop } from '../utils.js'
import { noop } from '../utils.js'
import { coresTable } from '../schema/project.js'
import * as rle from './bitfield-rle.js'
import { CoreIndex } from './core-index.js'

/** @import Hypercore from 'hypercore' */
/** @import { HypercorePeer, Namespace } from '../types.js' */

const WRITER_CORE_PREHAVES_DEBOUNCE_DELAY = 1000
Expand Down Expand Up @@ -46,12 +45,6 @@ export class CoreManager extends TypedEmitter {
#haveExtension
#deviceId
#l
/**
* We use this to reduce network traffic caused by requesting the same key
* from multiple clients.
* TODO: Remove items from this set after a max age
*/
#keyRequests = new TrackedKeyRequests()
#autoDownload

static get namespaces() {
Expand Down Expand Up @@ -152,8 +145,8 @@ export class CoreManager extends TypedEmitter {
'mapeo/project',
{
encoding: ProjectExtensionCodec,
onmessage: (msg, peer) => {
this.#handleProjectMessage(msg, peer)
onmessage: (msg) => {
this.#handleProjectMessage(msg)
},
}
)
Expand All @@ -169,12 +162,7 @@ export class CoreManager extends TypedEmitter {
this.#sendHaves(peer, this.#coreIndex).catch(() => {
this.#l.log('Failed to send pre-haves to newly-connected peer')
})
})
this.#creatorCore.on('peer-remove', (peer) => {
// When a peer is removed we clean up any unanswered key requests, so that
// we will request from a different peer, and to avoid the tracking of key
// requests growing without bounds.
this.#keyRequests.deleteByPeerKey(peer.remotePublicKey)
this.#sendAuthCoreKeys(peer)
})

this.#ready = Promise.all(
Expand Down Expand Up @@ -251,7 +239,6 @@ export class CoreManager extends TypedEmitter {
*/
async close() {
this.#state = 'closing'
this.#keyRequests.clear()
const promises = []
for (const { core } of this.#coreIndex) {
promises.push(core.close())
Expand All @@ -268,6 +255,7 @@ export class CoreManager extends TypedEmitter {
* @returns {CoreRecord}
*/
addCore(key, namespace) {
this.#l.log('Adding remote core %k to %s', key, namespace)
return this.#addCore({ publicKey: key }, namespace, true)
}

Expand Down Expand Up @@ -359,69 +347,31 @@ export class CoreManager extends TypedEmitter {
}

/**
* Send an extension message over the project creator core replication stream
* requesting a core key for the given discovery key.
* We only add auth cores from the project extension messages. Cores in other
* namespaces are added by the sync API from the core ownership docs
*
* @param {Buffer} peerKey
* @param {Buffer} discoveryKey
* @param {ProjectExtension} msg
*/
requestCoreKey(peerKey, discoveryKey) {
// No-op if we already have this core
if (this.getCoreByDiscoveryKey(discoveryKey)) return
const peer = this.#creatorCore.peers.find((peer) => {
return peer.remotePublicKey.equals(peerKey)
})
if (!peer) {
// This should not happen because this is only called from SyncApi, which
// checks the peer exists before calling this method.
this.#l.log(
'Attempted to request core key for %h, but no connected peer %h',
discoveryKey,
peerKey
)
return
#handleProjectMessage({ authCoreKeys }) {
for (const authCoreKey of authCoreKeys) {
// Use public method - these must be persisted (private method defaults to persisted=false)
this.addCore(authCoreKey, 'auth')
}
// Only request a key once, e.g. from the peer we first receive it from (we
// can assume that a peer must have the key if we see the discovery key in
// the protomux). This is necessary to reduce network traffic for many newly
// connected peers - otherwise duplicate requests will be sent to every peer
if (this.#keyRequests.has(discoveryKey)) return
this.#keyRequests.set(discoveryKey, peerKey)

this.#l.log(
'Requesting core key for discovery key %h from peer %h',
discoveryKey,
peerKey
)
const message = ProjectExtension.fromPartial({
wantCoreKeys: [discoveryKey],
})
this.#projectExtension.send(message, peer)
}

/**
* @param {ProjectExtension} msg
* Sends auth core keys to the given peer, skipping any keys that we know the
* peer has already (depends on the peer having already replicated the auth
* cores it has)
*
* @param {HypercorePeer} peer
*/
#handleProjectMessage({ wantCoreKeys, ...coreKeys }, peer) {
#sendAuthCoreKeys(peer) {
const message = ProjectExtension.create()
let hasKeys = false
for (const discoveryKey of wantCoreKeys) {
const coreRecord = this.getCoreByDiscoveryKey(discoveryKey)
if (!coreRecord) continue
message[`${coreRecord.namespace}CoreKeys`].push(coreRecord.key)
hasKeys = true
}
if (hasKeys) {
this.#projectExtension.send(message, peer)
}
for (const namespace of NAMESPACES) {
for (const coreKey of coreKeys[`${namespace}CoreKeys`]) {
// Use public method - these must be persisted (private method defaults to persisted=false)
this.addCore(coreKey, namespace)
this.#keyRequests.deleteByDiscoveryKey(discoveryKey(coreKey))
}
for (const { key } of this.getCores('auth')) {
message.authCoreKeys.push(key)
}
this.#projectExtension.send(message, peer)
}

/**
Expand Down Expand Up @@ -478,21 +428,14 @@ export class CoreManager extends TypedEmitter {
* ONLY FOR TESTING
* Replicate all cores in core manager
*
* NB: Remote peers need to be manually added when unit testing core manager
* without the Sync API
*
* @param {Parameters<Corestore['replicate']>[0]} stream
*/
[kCoreManagerReplicate](stream) {
const protocolStream = Hypercore.createProtocolStream(stream, {
ondiscoverykey: async (discoveryKey) => {
const peer = await findPeer(
this.creatorCore,
// @ts-ignore
protocolStream.noiseStream.remotePublicKey
)
if (!peer) return
this.requestCoreKey(peer.remotePublicKey, discoveryKey)
},
})
return this.#corestore.replicate(stream)
const protocolStream = this.#corestore.replicate(stream)
return protocolStream
}

/**
Expand Down Expand Up @@ -559,93 +502,3 @@ const HaveExtensionCodec = {
}
},
}

class TrackedKeyRequests {
/** @type {Map<string, string>} */
#byDiscoveryId = new Map()
/** @type {Map<string, Set<string>>} */
#byPeerId = new Map()

/**
* @param {Buffer} discoveryKey
* @param {Buffer} peerKey
*/
set(discoveryKey, peerKey) {
const discoveryId = keyToId(discoveryKey)
const peerId = keyToId(peerKey)
const existingForPeer = this.#byPeerId.get(peerId) || new Set()
this.#byDiscoveryId.set(discoveryId, peerId)
existingForPeer.add(discoveryId)
this.#byPeerId.set(peerId, existingForPeer)
return this
}
/**
* @param {Buffer} discoveryKey
*/
has(discoveryKey) {
const discoveryId = keyToId(discoveryKey)
return this.#byDiscoveryId.has(discoveryId)
}
/**
* @param {Buffer} discoveryKey
*/
deleteByDiscoveryKey(discoveryKey) {
const discoveryId = keyToId(discoveryKey)
const peerId = this.#byDiscoveryId.get(discoveryId)
if (!peerId) return false
this.#byDiscoveryId.delete(discoveryId)
const existingForPeer = this.#byPeerId.get(peerId)
if (existingForPeer) {
existingForPeer.delete(discoveryId)
}
return true
}
/**
* @param {Buffer} peerKey
*/
deleteByPeerKey(peerKey) {
const peerId = keyToId(peerKey)
const existingForPeer = this.#byPeerId.get(peerId)
if (!existingForPeer) return
for (const discoveryId of existingForPeer) {
this.#byDiscoveryId.delete(discoveryId)
}
this.#byPeerId.delete(peerId)
}
clear() {
this.#byDiscoveryId.clear()
this.#byPeerId.clear()
}
}

/**
* @param {Hypercore<"binary", Buffer>} core
* @param {Buffer} publicKey
* @param {{ timeout?: number }} [opts]
*/
function findPeer(core, publicKey, { timeout = 200 } = {}) {
const peer = core.peers.find((peer) => {
return peer.remotePublicKey.equals(publicKey)
})
if (peer) return peer
// This is called from the from the handleDiscoveryId event, which can
// happen before the peer connection is fully established, so we wait for
// the `peer-add` event, with a timeout in case the peer never gets added
return new Promise(function (res) {
const timeoutId = setTimeout(function () {
core.off('peer-add', onPeer)
res(null)
}, timeout)

core.on('peer-add', onPeer)

/** @param {HypercorePeer} peer */
function onPeer(peer) {
if (peer.remotePublicKey.equals(publicKey)) {
clearTimeout(timeoutId)
core.off('peer-add', onPeer)
res(peer)
}
}
})
}
5 changes: 5 additions & 0 deletions src/core-ownership.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ export class CoreOwnership extends TypedEmitter {
return this.#dataType.getByDocId(deviceId)
}

async getAll() {
await this.#ownershipWriteDone
return this.#dataType.getMany()
}

/**
*
* @param {KeyPair} identityKeypair
Expand Down
73 changes: 15 additions & 58 deletions src/generated/extensions.d.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
/// <reference types="node" />
import _m0 from "protobufjs/minimal.js";
export interface ProjectExtension {
wantCoreKeys: Buffer[];
authCoreKeys: Buffer[];
configCoreKeys: Buffer[];
dataCoreKeys: Buffer[];
blobIndexCoreKeys: Buffer[];
blobCoreKeys: Buffer[];
}
export interface HaveExtension {
discoveryKey: Buffer;
Expand All @@ -28,60 +22,23 @@ export declare function haveExtension_NamespaceToNumber(object: HaveExtension_Na
export declare const ProjectExtension: {
encode(message: ProjectExtension, writer?: _m0.Writer): _m0.Writer;
decode(input: _m0.Reader | Uint8Array, length?: number): ProjectExtension;
create<I extends {
wantCoreKeys?: Buffer[];
authCoreKeys?: Buffer[];
configCoreKeys?: Buffer[];
dataCoreKeys?: Buffer[];
blobIndexCoreKeys?: Buffer[];
blobCoreKeys?: Buffer[];
} & {
wantCoreKeys?: Buffer[] & Buffer[] & { [K in Exclude<keyof I["wantCoreKeys"], keyof Buffer[]>]: never; };
authCoreKeys?: Buffer[] & Buffer[] & { [K_1 in Exclude<keyof I["authCoreKeys"], keyof Buffer[]>]: never; };
configCoreKeys?: Buffer[] & Buffer[] & { [K_2 in Exclude<keyof I["configCoreKeys"], keyof Buffer[]>]: never; };
dataCoreKeys?: Buffer[] & Buffer[] & { [K_3 in Exclude<keyof I["dataCoreKeys"], keyof Buffer[]>]: never; };
blobIndexCoreKeys?: Buffer[] & Buffer[] & { [K_4 in Exclude<keyof I["blobIndexCoreKeys"], keyof Buffer[]>]: never; };
blobCoreKeys?: Buffer[] & Buffer[] & { [K_5 in Exclude<keyof I["blobCoreKeys"], keyof Buffer[]>]: never; };
} & { [K_6 in Exclude<keyof I, keyof ProjectExtension>]: never; }>(base?: I): ProjectExtension;
fromPartial<I_1 extends {
wantCoreKeys?: Buffer[];
authCoreKeys?: Buffer[];
configCoreKeys?: Buffer[];
dataCoreKeys?: Buffer[];
blobIndexCoreKeys?: Buffer[];
blobCoreKeys?: Buffer[];
} & {
wantCoreKeys?: Buffer[] & Buffer[] & { [K_7 in Exclude<keyof I_1["wantCoreKeys"], keyof Buffer[]>]: never; };
authCoreKeys?: Buffer[] & Buffer[] & { [K_8 in Exclude<keyof I_1["authCoreKeys"], keyof Buffer[]>]: never; };
configCoreKeys?: Buffer[] & Buffer[] & { [K_9 in Exclude<keyof I_1["configCoreKeys"], keyof Buffer[]>]: never; };
dataCoreKeys?: Buffer[] & Buffer[] & { [K_10 in Exclude<keyof I_1["dataCoreKeys"], keyof Buffer[]>]: never; };
blobIndexCoreKeys?: Buffer[] & Buffer[] & { [K_11 in Exclude<keyof I_1["blobIndexCoreKeys"], keyof Buffer[]>]: never; };
blobCoreKeys?: Buffer[] & Buffer[] & { [K_12 in Exclude<keyof I_1["blobCoreKeys"], keyof Buffer[]>]: never; };
} & { [K_13 in Exclude<keyof I_1, keyof ProjectExtension>]: never; }>(object: I_1): ProjectExtension;
create<I extends Exact<DeepPartial<ProjectExtension>, I>>(base?: I): ProjectExtension;
fromPartial<I extends Exact<DeepPartial<ProjectExtension>, I>>(object: I): ProjectExtension;
};
export declare const HaveExtension: {
encode(message: HaveExtension, writer?: _m0.Writer): _m0.Writer;
decode(input: _m0.Reader | Uint8Array, length?: number): HaveExtension;
create<I extends {
discoveryKey?: Buffer;
start?: number;
encodedBitfield?: Buffer;
namespace?: HaveExtension_Namespace;
} & {
discoveryKey?: Buffer;
start?: number;
encodedBitfield?: Buffer;
namespace?: HaveExtension_Namespace;
} & { [K in Exclude<keyof I, keyof HaveExtension>]: never; }>(base?: I): HaveExtension;
fromPartial<I_1 extends {
discoveryKey?: Buffer;
start?: number;
encodedBitfield?: Buffer;
namespace?: HaveExtension_Namespace;
} & {
discoveryKey?: Buffer;
start?: number;
encodedBitfield?: Buffer;
namespace?: HaveExtension_Namespace;
} & { [K_1 in Exclude<keyof I_1, keyof HaveExtension>]: never; }>(object: I_1): HaveExtension;
create<I extends Exact<DeepPartial<HaveExtension>, I>>(base?: I): HaveExtension;
fromPartial<I extends Exact<DeepPartial<HaveExtension>, I>>(object: I): HaveExtension;
};
type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined;
type DeepPartial<T> = T extends Builtin ? T : T extends Array<infer U> ? Array<DeepPartial<U>> : T extends ReadonlyArray<infer U> ? ReadonlyArray<DeepPartial<U>> : T extends {} ? {
[K in keyof T]?: DeepPartial<T[K]>;
} : Partial<T>;
type KeysOfUnion<T> = T extends T ? keyof T : never;
type Exact<P, I extends P> = P extends Builtin ? P : P & {
[K in keyof P]: Exact<P[K], I[K]>;
} & {
[K in Exclude<keyof I, KeysOfUnion<P>>]: never;
};
export {};
Loading

0 comments on commit ca9b7d4

Please sign in to comment.