Skip to content

Commit

Permalink
feat: MapeoRPC -> LocalPeers (#356)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaclennan authored Oct 30, 2023
1 parent 63c2cd6 commit 1f4abac
Show file tree
Hide file tree
Showing 13 changed files with 299 additions and 226 deletions.
17 changes: 11 additions & 6 deletions src/discovery/local-discovery.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import pTimeout from 'p-timeout'
import { keyToPublicId } from '@mapeo/crypto'

/** @typedef {{ publicKey: Buffer, secretKey: Buffer }} Keypair */
/** @typedef {import('../utils.js').OpenedNoiseStream<net.Socket>} OpenedNoiseStream */

export const ERR_DUPLICATE = 'Duplicate connection'

/**
* @typedef {Object} DiscoveryEvents
* @property {(connection: import('@hyperswarm/secret-stream')<net.Socket>) => void} connection
* @property {(connection: OpenedNoiseStream) => void} connection
*/

/**
Expand All @@ -24,7 +25,7 @@ export const ERR_DUPLICATE = 'Duplicate connection'
export class LocalDiscovery extends TypedEmitter {
#identityKeypair
#server
/** @type {Map<string, NoiseSecretStream<net.Socket>>} */
/** @type {Map<string, OpenedNoiseStream>} */
#noiseConnections = new Map()
#dnssd
#sm
Expand Down Expand Up @@ -142,14 +143,18 @@ export class LocalDiscovery extends TypedEmitter {
// Further errors will be handled in #handleNoiseStreamConnection()
socket.off('error', onSocketError)
secretStream.off('error', this.#handleSocketError)
this.#handleNoiseStreamConnection(secretStream)
this.#handleNoiseStreamConnection(
// We know the NoiseStream is open at this point, so we can coerce the type
/** @type {OpenedNoiseStream} */
(secretStream)
)
})
}

/**
*
* @param {NoiseSecretStream<net.Socket>} existing
* @param {NoiseSecretStream<net.Socket>} keeping
* @param {OpenedNoiseStream} existing
* @param {OpenedNoiseStream} keeping
*/
#handleConnectionSwap(existing, keeping) {
let closed = false
Expand All @@ -174,7 +179,7 @@ export class LocalDiscovery extends TypedEmitter {

/**
*
* @param {NoiseSecretStream<net.Socket>} conn
* @param {OpenedNoiseStream} conn
* @returns
*/
#handleNoiseStreamConnection(conn) {
Expand Down
2 changes: 1 addition & 1 deletion src/invite-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class InviteApi extends TypedEmitter {

/**
* @param {Object} options
* @param {import('./rpc/index.js').MapeoRPC} options.rpc
* @param {import('./local-peers.js').LocalPeers} options.rpc
* @param {object} options.queries
* @param {(projectId: string) => Promise<boolean>} options.queries.isMember
* @param {(invite: import('./generated/rpc.js').Invite) => Promise<void>} options.queries.addProject
Expand Down
162 changes: 104 additions & 58 deletions src/rpc/index.js → src/local-peers.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,42 @@
// @ts-check
import { TypedEmitter } from 'tiny-typed-emitter'
import Protomux from 'protomux'
import { openedNoiseSecretStream, keyToId } from '../utils.js'
import { openedNoiseSecretStream, keyToId } from './utils.js'
import cenc from 'compact-encoding'
import {
DeviceInfo,
Invite,
InviteResponse,
InviteResponse_Decision,
} from '../generated/rpc.js'
} from './generated/rpc.js'
import pDefer from 'p-defer'

const PROTOCOL_NAME = 'mapeo/rpc'

// Protomux message types depend on the order that messages are added to a
// channel (this needs to remain consistent). To avoid breaking changes, the
// types here should not change.
/** @satisfies {{ [k in keyof typeof import('../generated/rpc.js')]?: number }} */
/** @satisfies {{ [k in keyof typeof import('./generated/rpc.js')]?: number }} */
const MESSAGE_TYPES = {
Invite: 0,
InviteResponse: 1,
DeviceInfo: 2,
}
const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)])

/** @typedef {Peer['info']} PeerInfoInternal */
/** @typedef {Omit<PeerInfoInternal, 'status'> & { status: Exclude<PeerInfoInternal['status'], 'connecting'> }} PeerInfo */
/** @typedef {'connecting' | 'connected' | 'disconnected'} PeerState */
/** @typedef {import('type-fest').SetNonNullable<import('../generated/rpc.js').Invite, 'encryptionKeys'>} InviteWithKeys */
/**
* @typedef {object} PeerInfoBase
* @property {string} deviceId
* @property {string | undefined} name
*/
/** @typedef {PeerInfoBase & { status: 'connecting' }} PeerInfoConnecting */
/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux }} PeerInfoConnected */
/** @typedef {PeerInfoBase & { status: 'disconnected', disconnectedAt: number }} PeerInfoDisconnected */

/** @typedef {PeerInfoConnecting | PeerInfoConnected | PeerInfoDisconnected} PeerInfoInternal */
/** @typedef {PeerInfoConnected | PeerInfoDisconnected} PeerInfo */
/** @typedef {PeerInfoInternal['status']} PeerState */
/** @typedef {import('type-fest').SetNonNullable<import('./generated/rpc.js').Invite, 'encryptionKeys'>} InviteWithKeys */

/**
* @template ValueType
Expand All @@ -44,6 +53,12 @@ class Peer {
#connected
/** @type {Map<string, Array<DeferredPromise<InviteResponse['decision']>>>} */
pendingInvites = new Map()
/** @type {string | undefined} */
#name
#connectedAt = 0
#disconnectedAt = 0
/** @type {Protomux} */
#protomux

/**
* @param {object} options
Expand All @@ -55,40 +70,65 @@ class Peer {
this.#channel = channel
this.#connected = pDefer()
}
/** @returns {PeerInfoInternal} */
get info() {
return {
status: this.#state,
id: keyToId(this.#publicKey),
}
}
/**
* Poor-man's finite state machine. Rather than a `setState` method, only
* allows specific transitions between states.
*
* @param {'connect' | 'disconnect'} type
*/
action(type) {
switch (type) {
case 'connect':
/* c8 ignore next 3 */
if (this.#state !== 'connecting') {
return // TODO: report error - this should not happen
const deviceId = keyToId(this.#publicKey)
switch (this.#state) {
case 'connecting':
return {
status: this.#state,
deviceId,
name: this.#name,
}
this.#state = 'connected'
this.#connected.resolve()
break
case 'disconnect':
/* c8 ignore next */
if (this.#state === 'disconnected') return
this.#state = 'disconnected'
for (const pending of this.pendingInvites.values()) {
for (const { reject } of pending) {
reject(new PeerDisconnectedError())
}
case 'connected':
return {
status: this.#state,
deviceId,
name: this.#name,
connectedAt: this.#connectedAt,
protomux: this.#protomux,
}
this.pendingInvites.clear()
break
case 'disconnected':
return {
status: this.#state,
deviceId,
name: this.#name,
disconnectedAt: this.#disconnectedAt,
}
/* c8 ignore next 4 */
default: {
/** @type {never} */
const _exhaustiveCheck = this.#state
return _exhaustiveCheck
}
}
}
/** @param {Protomux} protomux */
connect(protomux) {
this.#protomux = protomux
/* c8 ignore next 3 */
if (this.#state !== 'connecting') {
return // TODO: report error - this should not happen
}
this.#state = 'connected'
this.#connectedAt = Date.now()
this.#connected.resolve()
}
disconnect() {
// @ts-ignore - easier to ignore this than handle this for TS - avoids holding a reference to old Protomux instances
this.#protomux = undefined
/* c8 ignore next */
if (this.#state === 'disconnected') return
this.#state = 'disconnected'
this.#disconnectedAt = Date.now()
// Can just resolve this rather than reject, because #assertConnected will throw the error
this.#connected.resolve()
for (const pending of this.pendingInvites.values()) {
for (const { reject } of pending) {
reject(new PeerDisconnectedError())
}
}
this.pendingInvites.clear()
}
/** @param {InviteWithKeys} invite */
async sendInvite(invite) {
Expand All @@ -111,6 +151,10 @@ class Peer {
const messageType = MESSAGE_TYPES.DeviceInfo
this.#channel.messages[messageType].send(buf)
}
/** @param {DeviceInfo} deviceInfo */
receiveDeviceInfo(deviceInfo) {
this.#name = deviceInfo.name
}
async #assertConnected() {
await this.#connected.promise
if (this.#state === 'connected' && !this.#channel.closed) return
Expand All @@ -120,23 +164,18 @@ class Peer {
}

/**
* @typedef {object} MapeoRPCEvents
* @typedef {object} LocalPeersEvents
* @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status
* @property {(peerId: string, invite: InviteWithKeys) => void} invite Emitted when an invite is received
* @property {(deviceInfo: DeviceInfo & { deviceId: string }) => void} device-info Emitted when we receive device info for a device
*/

/** @extends {TypedEmitter<MapeoRPCEvents>} */
export class MapeoRPC extends TypedEmitter {
/** @extends {TypedEmitter<LocalPeersEvents>} */
export class LocalPeers extends TypedEmitter {
/** @type {Map<string, Peer>} */
#peers = new Map()
/** @type {Set<Promise<any>>} */
#opening = new Set()

constructor() {
super()
}

static InviteResponse = InviteResponse_Decision

/**
Expand Down Expand Up @@ -221,16 +260,21 @@ export class MapeoRPC extends TypedEmitter {
/**
* Connect to a peer over an existing NoiseSecretStream
*
* @param {import('../types.js').NoiseStream | import('../types.js').ProtocolStream} stream a NoiseSecretStream from @hyperswarm/secret-stream
* @param {import('./types.js').NoiseStream<any>} stream a NoiseSecretStream from @hyperswarm/secret-stream
* @returns {import('./types.js').ReplicationStream}
*/
connect(stream) {
if (!stream.noiseStream) throw new Error('Invalid stream')
const protomux =
stream.userData && Protomux.isProtomux(stream.userData)
? stream.userData
: Protomux.from(stream)
stream.userData = protomux
this.#opening.add(stream.opened)

// No need to connect error handler to stream because Protomux does this,
// and errors are eventually handled by #closePeer

// noiseSecretStream.remotePublicKey can be null before the stream has
// opened, so this helped awaits the open
openedNoiseSecretStream(stream).then((stream) => {
Expand All @@ -254,7 +298,7 @@ export class MapeoRPC extends TypedEmitter {
userData: null,
protocol: PROTOCOL_NAME,
messages,
onopen: this.#openPeer.bind(this, remotePublicKey),
onopen: this.#openPeer.bind(this, remotePublicKey, protomux),
onclose: this.#closePeer.bind(this, remotePublicKey),
})
channel.open()
Expand All @@ -263,27 +307,28 @@ export class MapeoRPC extends TypedEmitter {
const existingPeer = this.#peers.get(peerId)
/* c8 ignore next 3 */
if (existingPeer && existingPeer.info.status !== 'disconnected') {
existingPeer.action('disconnect') // Should not happen, but in case
existingPeer.disconnect() // Should not happen, but in case
}
const peer = new Peer({ publicKey: remotePublicKey, channel })
this.#peers.set(peerId, peer)
// Do not emit peers now - will emit when connected
})

return stream
return stream.rawStream
}

/** @param {Buffer} publicKey */
#openPeer(publicKey) {
/**
* @param {Buffer} publicKey
* @param {Protomux} protomux
*/
#openPeer(publicKey, protomux) {
const peerId = keyToId(publicKey)
const peer = this.#peers.get(peerId)
/* c8 ignore next */
if (!peer) return // TODO: report error - this should not happen
// No-op if no change in state
/* c8 ignore next */
if (peer.info.status === 'connected') return // TODO: report error - this should not happen
peer.action('connect')
this.#emitPeers()
const wasConnected = peer.info.status === 'connected'
peer.connect(protomux)
if (!wasConnected) this.#emitPeers()
}

/** @param {Buffer} publicKey */
Expand All @@ -296,7 +341,7 @@ export class MapeoRPC extends TypedEmitter {
/* c8 ignore next */
if (peer.info.status === 'disconnected') return
// TODO: Track reasons for closing
peer.action('disconnect')
peer.disconnect()
this.#emitPeers()
}

Expand Down Expand Up @@ -348,7 +393,8 @@ export class MapeoRPC extends TypedEmitter {
}
case 'DeviceInfo': {
const deviceInfo = DeviceInfo.decode(value)
this.emit('device-info', { ...deviceInfo, deviceId: peerId })
peer.receiveDeviceInfo(deviceInfo)
this.#emitPeers()
break
}
/* c8 ignore next 5 */
Expand Down
4 changes: 2 additions & 2 deletions src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
projectKeyToPublicId,
} from './utils.js'
import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js'
import { MapeoRPC } from './rpc/index.js'
import { LocalPeers } from './local-peers.js'
import { InviteApi } from './invite-api.js'

/** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */
Expand Down Expand Up @@ -69,7 +69,7 @@ export class MapeoManager {
migrationsFolder: new URL('../drizzle/client', import.meta.url).pathname,
})

this.#rpc = new MapeoRPC()
this.#rpc = new LocalPeers()
this.#keyManager = new KeyManager(rootKey)
this.#deviceId = getDeviceId(this.#keyManager)
this.#projectSettingsIndexWriter = new IndexWriter({
Expand Down
2 changes: 1 addition & 1 deletion src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class MapeoProject {
* @param {import('drizzle-orm/better-sqlite3').BetterSQLite3Database} opts.sharedDb
* @param {IndexWriter} opts.sharedIndexWriter
* @param {import('./types.js').CoreStorage} opts.coreStorage Folder to store all hypercore data
* @param {import('./rpc/index.js').MapeoRPC} opts.rpc
* @param {import('./local-peers.js').LocalPeers} opts.rpc
*
*/
constructor({
Expand Down
2 changes: 1 addition & 1 deletion src/member-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export class MemberApi extends TypedEmitter {
* @param {import('./core-ownership.js').CoreOwnership} opts.coreOwnership
* @param {import('./generated/keys.js').EncryptionKeys} opts.encryptionKeys
* @param {Buffer} opts.projectKey
* @param {import('./rpc/index.js').MapeoRPC} opts.rpc
* @param {import('./local-peers.js').LocalPeers} opts.rpc
* @param {Object} opts.dataTypes
* @param {Pick<DeviceInfoDataType, 'getByDocId' | 'getMany'>} opts.dataTypes.deviceInfo
* @param {Pick<ProjectDataType, 'getByDocId'>} opts.dataTypes.project
Expand Down
Loading

0 comments on commit 1f4abac

Please sign in to comment.