Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: MapeoRPC -> LocalPeers #356

Merged
merged 9 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions src/discovery/local-discovery.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import pTimeout from 'p-timeout'
import { keyToPublicId } from '@mapeo/crypto'

/** @typedef {{ publicKey: Buffer, secretKey: Buffer }} Keypair */
/** @typedef {import('../utils.js').OpenedNoiseStream<net.Socket>} OpenedNoiseStream */

export const ERR_DUPLICATE = 'Duplicate connection'

/**
* @typedef {Object} DiscoveryEvents
* @property {(connection: import('@hyperswarm/secret-stream')<net.Socket>) => void} connection
* @property {(connection: OpenedNoiseStream) => void} connection
*/

/**
Expand All @@ -24,7 +25,7 @@ export const ERR_DUPLICATE = 'Duplicate connection'
export class LocalDiscovery extends TypedEmitter {
#identityKeypair
#server
/** @type {Map<string, NoiseSecretStream<net.Socket>>} */
/** @type {Map<string, OpenedNoiseStream>} */
#noiseConnections = new Map()
#dnssd
#sm
Expand Down Expand Up @@ -142,14 +143,18 @@ export class LocalDiscovery extends TypedEmitter {
// Further errors will be handled in #handleNoiseStreamConnection()
socket.off('error', onSocketError)
secretStream.off('error', this.#handleSocketError)
this.#handleNoiseStreamConnection(secretStream)
this.#handleNoiseStreamConnection(
// We know the NoiseStream is open at this point, so we can coerce the type
/** @type {OpenedNoiseStream} */
(secretStream)
)
})
}

/**
*
* @param {NoiseSecretStream<net.Socket>} existing
* @param {NoiseSecretStream<net.Socket>} keeping
* @param {OpenedNoiseStream} existing
* @param {OpenedNoiseStream} keeping
*/
#handleConnectionSwap(existing, keeping) {
let closed = false
Expand All @@ -174,7 +179,7 @@ export class LocalDiscovery extends TypedEmitter {

/**
*
* @param {NoiseSecretStream<net.Socket>} conn
* @param {OpenedNoiseStream} conn
* @returns
*/
#handleNoiseStreamConnection(conn) {
Expand Down
2 changes: 1 addition & 1 deletion src/invite-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class InviteApi extends TypedEmitter {

/**
* @param {Object} options
* @param {import('./rpc/index.js').MapeoRPC} options.rpc
* @param {import('./local-peers.js').LocalPeers} options.rpc
* @param {object} options.queries
* @param {(projectId: string) => Promise<boolean>} options.queries.isMember
* @param {(invite: import('./generated/rpc.js').Invite) => Promise<void>} options.queries.addProject
Expand Down
162 changes: 104 additions & 58 deletions src/rpc/index.js → src/local-peers.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,42 @@
// @ts-check
import { TypedEmitter } from 'tiny-typed-emitter'
import Protomux from 'protomux'
import { openedNoiseSecretStream, keyToId } from '../utils.js'
import { openedNoiseSecretStream, keyToId } from './utils.js'
import cenc from 'compact-encoding'
import {
DeviceInfo,
Invite,
InviteResponse,
InviteResponse_Decision,
} from '../generated/rpc.js'
} from './generated/rpc.js'
import pDefer from 'p-defer'

const PROTOCOL_NAME = 'mapeo/rpc'

// Protomux message types depend on the order that messages are added to a
// channel (this needs to remain consistent). To avoid breaking changes, the
// types here should not change.
/** @satisfies {{ [k in keyof typeof import('../generated/rpc.js')]?: number }} */
/** @satisfies {{ [k in keyof typeof import('./generated/rpc.js')]?: number }} */
const MESSAGE_TYPES = {
Invite: 0,
InviteResponse: 1,
DeviceInfo: 2,
}
const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)])

/** @typedef {Peer['info']} PeerInfoInternal */
/** @typedef {Omit<PeerInfoInternal, 'status'> & { status: Exclude<PeerInfoInternal['status'], 'connecting'> }} PeerInfo */
/** @typedef {'connecting' | 'connected' | 'disconnected'} PeerState */
/** @typedef {import('type-fest').SetNonNullable<import('../generated/rpc.js').Invite, 'encryptionKeys'>} InviteWithKeys */
/**
* @typedef {object} PeerInfoBase
* @property {string} deviceId
* @property {string | undefined} name
*/
/** @typedef {PeerInfoBase & { status: 'connecting' }} PeerInfoConnecting */
/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux }} PeerInfoConnected */
/** @typedef {PeerInfoBase & { status: 'disconnected', disconnectedAt: number }} PeerInfoDisconnected */

/** @typedef {PeerInfoConnecting | PeerInfoConnected | PeerInfoDisconnected} PeerInfoInternal */
/** @typedef {PeerInfoConnected | PeerInfoDisconnected} PeerInfo */
/** @typedef {PeerInfoInternal['status']} PeerState */
/** @typedef {import('type-fest').SetNonNullable<import('./generated/rpc.js').Invite, 'encryptionKeys'>} InviteWithKeys */

/**
* @template ValueType
Expand All @@ -44,6 +53,12 @@ class Peer {
#connected
/** @type {Map<string, Array<DeferredPromise<InviteResponse['decision']>>>} */
pendingInvites = new Map()
/** @type {string | undefined} */
#name
#connectedAt = 0
#disconnectedAt = 0
achou11 marked this conversation as resolved.
Show resolved Hide resolved
/** @type {Protomux} */
#protomux

/**
* @param {object} options
Expand All @@ -55,40 +70,65 @@ class Peer {
this.#channel = channel
this.#connected = pDefer()
}
/** @returns {PeerInfoInternal} */
get info() {
return {
status: this.#state,
id: keyToId(this.#publicKey),
}
}
/**
* Poor-man's finite state machine. Rather than a `setState` method, only
* allows specific transitions between states.
*
* @param {'connect' | 'disconnect'} type
*/
action(type) {
switch (type) {
case 'connect':
/* c8 ignore next 3 */
if (this.#state !== 'connecting') {
return // TODO: report error - this should not happen
const deviceId = keyToId(this.#publicKey)
switch (this.#state) {
case 'connecting':
return {
status: this.#state,
deviceId,
name: this.#name,
}
this.#state = 'connected'
this.#connected.resolve()
break
case 'disconnect':
/* c8 ignore next */
if (this.#state === 'disconnected') return
this.#state = 'disconnected'
for (const pending of this.pendingInvites.values()) {
for (const { reject } of pending) {
reject(new PeerDisconnectedError())
}
case 'connected':
return {
status: this.#state,
deviceId,
name: this.#name,
connectedAt: this.#connectedAt,
protomux: this.#protomux,
}
this.pendingInvites.clear()
break
case 'disconnected':
return {
status: this.#state,
deviceId,
name: this.#name,
disconnectedAt: this.#disconnectedAt,
}
/* c8 ignore next 4 */
default: {
/** @type {never} */
const _exhaustiveCheck = this.#state
return _exhaustiveCheck
}
}
}
/** @param {Protomux} protomux */
connect(protomux) {
this.#protomux = protomux
/* c8 ignore next 3 */
if (this.#state !== 'connecting') {
return // TODO: report error - this should not happen
}
this.#state = 'connected'
this.#connectedAt = Date.now()
this.#connected.resolve()
}
disconnect() {
// @ts-ignore - easier to ignore this than handle this for TS - avoids holding a reference to old Protomux instances
this.#protomux = undefined
/* c8 ignore next */
if (this.#state === 'disconnected') return
this.#state = 'disconnected'
this.#disconnectedAt = Date.now()
// Can just resolve this rather than reject, because #assertConnected will throw the error
this.#connected.resolve()
for (const pending of this.pendingInvites.values()) {
for (const { reject } of pending) {
reject(new PeerDisconnectedError())
}
}
this.pendingInvites.clear()
}
/** @param {InviteWithKeys} invite */
async sendInvite(invite) {
Expand All @@ -111,6 +151,10 @@ class Peer {
const messageType = MESSAGE_TYPES.DeviceInfo
this.#channel.messages[messageType].send(buf)
}
/** @param {DeviceInfo} deviceInfo */
receiveDeviceInfo(deviceInfo) {
this.#name = deviceInfo.name
}
async #assertConnected() {
await this.#connected.promise
if (this.#state === 'connected' && !this.#channel.closed) return
Expand All @@ -120,23 +164,18 @@ class Peer {
}

/**
* @typedef {object} MapeoRPCEvents
* @typedef {object} LocalPeersEvents
* @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status
* @property {(peerId: string, invite: InviteWithKeys) => void} invite Emitted when an invite is received
* @property {(deviceInfo: DeviceInfo & { deviceId: string }) => void} device-info Emitted when we receive device info for a device
*/

/** @extends {TypedEmitter<MapeoRPCEvents>} */
export class MapeoRPC extends TypedEmitter {
/** @extends {TypedEmitter<LocalPeersEvents>} */
export class LocalPeers extends TypedEmitter {
/** @type {Map<string, Peer>} */
#peers = new Map()
/** @type {Set<Promise<any>>} */
#opening = new Set()

constructor() {
super()
}

static InviteResponse = InviteResponse_Decision

/**
Expand Down Expand Up @@ -221,16 +260,21 @@ export class MapeoRPC extends TypedEmitter {
/**
* Connect to a peer over an existing NoiseSecretStream
*
* @param {import('../types.js').NoiseStream | import('../types.js').ProtocolStream} stream a NoiseSecretStream from @hyperswarm/secret-stream
* @param {import('./types.js').NoiseStream<any>} stream a NoiseSecretStream from @hyperswarm/secret-stream
* @returns {import('./types.js').ReplicationStream}
*/
connect(stream) {
if (!stream.noiseStream) throw new Error('Invalid stream')
const protomux =
stream.userData && Protomux.isProtomux(stream.userData)
? stream.userData
: Protomux.from(stream)
stream.userData = protomux
this.#opening.add(stream.opened)

// No need to connect error handler to stream because Protomux does this,
// and errors are eventually handled by #closePeer

// noiseSecretStream.remotePublicKey can be null before the stream has
// opened, so this helped awaits the open
openedNoiseSecretStream(stream).then((stream) => {
Expand All @@ -254,7 +298,7 @@ export class MapeoRPC extends TypedEmitter {
userData: null,
protocol: PROTOCOL_NAME,
messages,
onopen: this.#openPeer.bind(this, remotePublicKey),
onopen: this.#openPeer.bind(this, remotePublicKey, protomux),
onclose: this.#closePeer.bind(this, remotePublicKey),
})
channel.open()
Expand All @@ -263,27 +307,28 @@ export class MapeoRPC extends TypedEmitter {
const existingPeer = this.#peers.get(peerId)
/* c8 ignore next 3 */
if (existingPeer && existingPeer.info.status !== 'disconnected') {
existingPeer.action('disconnect') // Should not happen, but in case
existingPeer.disconnect() // Should not happen, but in case
}
const peer = new Peer({ publicKey: remotePublicKey, channel })
this.#peers.set(peerId, peer)
// Do not emit peers now - will emit when connected
})

return stream
return stream.rawStream
}

/** @param {Buffer} publicKey */
#openPeer(publicKey) {
/**
* @param {Buffer} publicKey
* @param {Protomux} protomux
*/
#openPeer(publicKey, protomux) {
const peerId = keyToId(publicKey)
const peer = this.#peers.get(peerId)
/* c8 ignore next */
if (!peer) return // TODO: report error - this should not happen
// No-op if no change in state
/* c8 ignore next */
if (peer.info.status === 'connected') return // TODO: report error - this should not happen
peer.action('connect')
this.#emitPeers()
const wasConnected = peer.info.status === 'connected'
peer.connect(protomux)
if (!wasConnected) this.#emitPeers()
}

/** @param {Buffer} publicKey */
Expand All @@ -296,7 +341,7 @@ export class MapeoRPC extends TypedEmitter {
/* c8 ignore next */
if (peer.info.status === 'disconnected') return
// TODO: Track reasons for closing
peer.action('disconnect')
peer.disconnect()
this.#emitPeers()
}

Expand Down Expand Up @@ -348,7 +393,8 @@ export class MapeoRPC extends TypedEmitter {
}
case 'DeviceInfo': {
const deviceInfo = DeviceInfo.decode(value)
this.emit('device-info', { ...deviceInfo, deviceId: peerId })
peer.receiveDeviceInfo(deviceInfo)
this.#emitPeers()
break
}
/* c8 ignore next 5 */
Expand Down
4 changes: 2 additions & 2 deletions src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
projectKeyToPublicId,
} from './utils.js'
import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js'
import { MapeoRPC } from './rpc/index.js'
import { LocalPeers } from './local-peers.js'
import { InviteApi } from './invite-api.js'

/** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */
Expand Down Expand Up @@ -69,7 +69,7 @@ export class MapeoManager {
migrationsFolder: new URL('../drizzle/client', import.meta.url).pathname,
})

this.#rpc = new MapeoRPC()
this.#rpc = new LocalPeers()
this.#keyManager = new KeyManager(rootKey)
this.#deviceId = getDeviceId(this.#keyManager)
this.#projectSettingsIndexWriter = new IndexWriter({
Expand Down
2 changes: 1 addition & 1 deletion src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class MapeoProject {
* @param {import('drizzle-orm/better-sqlite3').BetterSQLite3Database} opts.sharedDb
* @param {IndexWriter} opts.sharedIndexWriter
* @param {import('./types.js').CoreStorage} opts.coreStorage Folder to store all hypercore data
* @param {import('./rpc/index.js').MapeoRPC} opts.rpc
* @param {import('./local-peers.js').LocalPeers} opts.rpc
*
*/
constructor({
Expand Down
2 changes: 1 addition & 1 deletion src/member-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export class MemberApi extends TypedEmitter {
* @param {import('./core-ownership.js').CoreOwnership} opts.coreOwnership
* @param {import('./generated/keys.js').EncryptionKeys} opts.encryptionKeys
* @param {Buffer} opts.projectKey
* @param {import('./rpc/index.js').MapeoRPC} opts.rpc
* @param {import('./local-peers.js').LocalPeers} opts.rpc
* @param {Object} opts.dataTypes
* @param {Pick<DeviceInfoDataType, 'getByDocId' | 'getMany'>} opts.dataTypes.deviceInfo
* @param {Pick<ProjectDataType, 'getByDocId'>} opts.dataTypes.project
Expand Down
Loading
Loading