Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: listLocalPeers() & local-peers event #360

Merged
merged 14 commits into from
Nov 9, 2023
17 changes: 11 additions & 6 deletions src/discovery/local-discovery.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import pTimeout from 'p-timeout'
import { keyToPublicId } from '@mapeo/crypto'

/** @typedef {{ publicKey: Buffer, secretKey: Buffer }} Keypair */
/** @typedef {import('../utils.js').OpenedNoiseStream<net.Socket>} OpenedNoiseStream */

export const ERR_DUPLICATE = 'Duplicate connection'

/**
* @typedef {Object} DiscoveryEvents
* @property {(connection: import('@hyperswarm/secret-stream')<net.Socket>) => void} connection
* @property {(connection: OpenedNoiseStream) => void} connection
*/

/**
Expand All @@ -24,7 +25,7 @@ export const ERR_DUPLICATE = 'Duplicate connection'
export class LocalDiscovery extends TypedEmitter {
#identityKeypair
#server
/** @type {Map<string, NoiseSecretStream<net.Socket>>} */
/** @type {Map<string, OpenedNoiseStream>} */
#noiseConnections = new Map()
#dnssd
#sm
Expand Down Expand Up @@ -142,14 +143,18 @@ export class LocalDiscovery extends TypedEmitter {
// Further errors will be handled in #handleNoiseStreamConnection()
socket.off('error', onSocketError)
secretStream.off('error', this.#handleSocketError)
this.#handleNoiseStreamConnection(secretStream)
this.#handleNoiseStreamConnection(
// We know the NoiseStream is open at this point, so we can coerce the type
/** @type {OpenedNoiseStream} */
(secretStream)
)
})
}

/**
*
* @param {NoiseSecretStream<net.Socket>} existing
* @param {NoiseSecretStream<net.Socket>} keeping
* @param {OpenedNoiseStream} existing
* @param {OpenedNoiseStream} keeping
*/
#handleConnectionSwap(existing, keeping) {
let closed = false
Expand All @@ -174,7 +179,7 @@ export class LocalDiscovery extends TypedEmitter {

/**
*
* @param {NoiseSecretStream<net.Socket>} conn
* @param {OpenedNoiseStream} conn
* @returns
*/
#handleNoiseStreamConnection(conn) {
Expand Down
2 changes: 1 addition & 1 deletion src/invite-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class InviteApi extends TypedEmitter {

/**
* @param {Object} options
* @param {import('./rpc/index.js').MapeoRPC} options.rpc
* @param {import('./local-peers.js').LocalPeers} options.rpc
* @param {object} options.queries
* @param {(projectId: string) => Promise<boolean>} options.queries.isMember
* @param {(invite: import('./generated/rpc.js').Invite) => Promise<void>} options.queries.addProject
Expand Down
169 changes: 112 additions & 57 deletions src/rpc/index.js → src/local-peers.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,42 @@
// @ts-check
import { TypedEmitter } from 'tiny-typed-emitter'
import Protomux from 'protomux'
import { openedNoiseSecretStream, keyToId } from '../utils.js'
import { openedNoiseSecretStream, keyToId } from './utils.js'
import cenc from 'compact-encoding'
import {
DeviceInfo,
Invite,
InviteResponse,
InviteResponse_Decision,
} from '../generated/rpc.js'
} from './generated/rpc.js'
import pDefer from 'p-defer'

const PROTOCOL_NAME = 'mapeo/rpc'

// Protomux message types depend on the order that messages are added to a
// channel (this needs to remain consistent). To avoid breaking changes, the
// types here should not change.
/** @satisfies {{ [k in keyof typeof import('../generated/rpc.js')]?: number }} */
/** @satisfies {{ [k in keyof typeof import('./generated/rpc.js')]?: number }} */
const MESSAGE_TYPES = {
Invite: 0,
InviteResponse: 1,
DeviceInfo: 2,
}
const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)])

/** @typedef {Peer['info']} PeerInfoInternal */
/** @typedef {Omit<PeerInfoInternal, 'status'> & { status: Exclude<PeerInfoInternal['status'], 'connecting'> }} PeerInfo */
/** @typedef {'connecting' | 'connected' | 'disconnected'} PeerState */
/** @typedef {import('type-fest').SetNonNullable<import('../generated/rpc.js').Invite, 'encryptionKeys'>} InviteWithKeys */
/**
* @typedef {object} PeerInfoBase
* @property {string} deviceId
* @property {string | undefined} name
*/
/** @typedef {PeerInfoBase & { status: 'connecting' }} PeerInfoConnecting */
/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux<import('@hyperswarm/secret-stream')> }} PeerInfoConnected */
/** @typedef {PeerInfoBase & { status: 'disconnected', disconnectedAt: number }} PeerInfoDisconnected */

/** @typedef {PeerInfoConnecting | PeerInfoConnected | PeerInfoDisconnected} PeerInfoInternal */
/** @typedef {PeerInfoConnected | PeerInfoDisconnected} PeerInfo */
/** @typedef {PeerInfoInternal['status']} PeerState */
/** @typedef {import('type-fest').SetNonNullable<import('./generated/rpc.js').Invite, 'encryptionKeys'>} InviteWithKeys */

/**
* @template ValueType
Expand All @@ -44,6 +53,12 @@ class Peer {
#connected
/** @type {Map<string, Array<DeferredPromise<InviteResponse['decision']>>>} */
pendingInvites = new Map()
/** @type {string | undefined} */
#name
#connectedAt = 0
#disconnectedAt = 0
/** @type {Protomux<import('@hyperswarm/secret-stream')>} */
#protomux

/**
* @param {object} options
Expand All @@ -55,41 +70,66 @@ class Peer {
this.#channel = channel
this.#connected = pDefer()
}
/** @returns {PeerInfoInternal} */
get info() {
return {
status: this.#state,
id: keyToId(this.#publicKey),
}
}
/**
* Poor-man's finite state machine. Rather than a `setState` method, only
* allows specific transitions between states.
*
* @param {'connect' | 'disconnect'} type
*/
action(type) {
switch (type) {
case 'connect':
/* c8 ignore next 3 */
if (this.#state !== 'connecting') {
return // TODO: report error - this should not happen
const deviceId = keyToId(this.#publicKey)
switch (this.#state) {
case 'connecting':
return {
status: this.#state,
deviceId,
name: this.#name,
}
this.#state = 'connected'
this.#connected.resolve()
break
case 'disconnect':
/* c8 ignore next */
if (this.#state === 'disconnected') return
this.#state = 'disconnected'
for (const pending of this.pendingInvites.values()) {
for (const { reject } of pending) {
reject(new PeerDisconnectedError())
}
case 'connected':
return {
status: this.#state,
deviceId,
name: this.#name,
connectedAt: this.#connectedAt,
protomux: this.#protomux,
}
this.pendingInvites.clear()
break
case 'disconnected':
return {
status: this.#state,
deviceId,
name: this.#name,
disconnectedAt: this.#disconnectedAt,
}
/* c8 ignore next 4 */
default: {
/** @type {never} */
const _exhaustiveCheck = this.#state
return _exhaustiveCheck
}
}
}
/** @param {Protomux<import('@hyperswarm/secret-stream')>} protomux */
connect(protomux) {
this.#protomux = protomux
/* c8 ignore next 3 */
if (this.#state !== 'connecting') {
return // TODO: report error - this should not happen
}
this.#state = 'connected'
this.#connectedAt = Date.now()
this.#connected.resolve()
}
disconnect() {
// @ts-ignore - easier to ignore this than handle this for TS - avoids holding a reference to old Protomux instances
this.#protomux = undefined
/* c8 ignore next */
if (this.#state === 'disconnected') return
this.#state = 'disconnected'
this.#disconnectedAt = Date.now()
// Can just resolve this rather than reject, because #assertConnected will throw the error
this.#connected.resolve()
for (const pending of this.pendingInvites.values()) {
for (const { reject } of pending) {
reject(new PeerDisconnectedError())
}
}
this.pendingInvites.clear()
}
/** @param {InviteWithKeys} invite */
async sendInvite(invite) {
await this.#assertConnected()
Expand All @@ -111,6 +151,10 @@ class Peer {
const messageType = MESSAGE_TYPES.DeviceInfo
this.#channel.messages[messageType].send(buf)
}
/** @param {DeviceInfo} deviceInfo */
receiveDeviceInfo(deviceInfo) {
this.#name = deviceInfo.name
}
async #assertConnected() {
await this.#connected.promise
if (this.#state === 'connected' && !this.#channel.closed) return
Expand All @@ -120,23 +164,20 @@ class Peer {
}

/**
* @typedef {object} MapeoRPCEvents
* @typedef {object} LocalPeersEvents
* @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status
* @property {(peer: PeerInfoConnected) => void} peer-add Emitted when a new peer is connected
* @property {(peerId: string, invite: InviteWithKeys) => void} invite Emitted when an invite is received
* @property {(deviceInfo: DeviceInfo & { deviceId: string }) => void} device-info Emitted when we receive device info for a device
* @property {(discoveryKey: Buffer, stream: import('./types.js').ReplicationStream) => void} discovery-key Emitted when a new hypercore is replicated (by a peer) to a peer replication stream (passed as the second parameter)
*/

/** @extends {TypedEmitter<MapeoRPCEvents>} */
export class MapeoRPC extends TypedEmitter {
/** @extends {TypedEmitter<LocalPeersEvents>} */
export class LocalPeers extends TypedEmitter {
/** @type {Map<string, Peer>} */
#peers = new Map()
/** @type {Set<Promise<any>>} */
#opening = new Set()

constructor() {
super()
}

static InviteResponse = InviteResponse_Decision

/**
Expand Down Expand Up @@ -221,16 +262,28 @@ export class MapeoRPC extends TypedEmitter {
/**
* Connect to a peer over an existing NoiseSecretStream
*
* @param {import('../types.js').NoiseStream | import('../types.js').ProtocolStream} stream a NoiseSecretStream from @hyperswarm/secret-stream
* @param {import('./types.js').NoiseStream<any>} stream a NoiseSecretStream from @hyperswarm/secret-stream
* @returns {import('./types.js').ReplicationStream}
*/
connect(stream) {
if (!stream.noiseStream) throw new Error('Invalid stream')
const protomux =
stream.userData && Protomux.isProtomux(stream.userData)
? stream.userData
: Protomux.from(stream)
stream.userData = protomux
this.#opening.add(stream.opened)

protomux.pair(
{ protocol: 'hypercore/alpha' },
/** @param {Buffer} discoveryKey */ async (discoveryKey) => {
this.emit('discovery-key', discoveryKey, stream.rawStream)
}
)

// No need to connect error handler to stream because Protomux does this,
// and errors are eventually handled by #closePeer

// noiseSecretStream.remotePublicKey can be null before the stream has
// opened, so this helped awaits the open
openedNoiseSecretStream(stream).then((stream) => {
Expand All @@ -254,7 +307,7 @@ export class MapeoRPC extends TypedEmitter {
userData: null,
protocol: PROTOCOL_NAME,
messages,
onopen: this.#openPeer.bind(this, remotePublicKey),
onopen: this.#openPeer.bind(this, remotePublicKey, protomux),
onclose: this.#closePeer.bind(this, remotePublicKey),
})
channel.open()
Expand All @@ -263,27 +316,28 @@ export class MapeoRPC extends TypedEmitter {
const existingPeer = this.#peers.get(peerId)
/* c8 ignore next 3 */
if (existingPeer && existingPeer.info.status !== 'disconnected') {
existingPeer.action('disconnect') // Should not happen, but in case
existingPeer.disconnect() // Should not happen, but in case
}
const peer = new Peer({ publicKey: remotePublicKey, channel })
this.#peers.set(peerId, peer)
// Do not emit peers now - will emit when connected
})

return stream
return stream.rawStream
}

/** @param {Buffer} publicKey */
#openPeer(publicKey) {
/**
* @param {Buffer} publicKey
* @param {Protomux<import('@hyperswarm/secret-stream')>} protomux
*/
#openPeer(publicKey, protomux) {
const peerId = keyToId(publicKey)
const peer = this.#peers.get(peerId)
/* c8 ignore next */
if (!peer) return // TODO: report error - this should not happen
// No-op if no change in state
/* c8 ignore next */
if (peer.info.status === 'connected') return // TODO: report error - this should not happen
peer.action('connect')
peer.connect(protomux)
this.#emitPeers()
this.emit('peer-add', /** @type {PeerInfoConnected} */ (peer.info))
}

/** @param {Buffer} publicKey */
Expand All @@ -296,7 +350,7 @@ export class MapeoRPC extends TypedEmitter {
/* c8 ignore next */
if (peer.info.status === 'disconnected') return
// TODO: Track reasons for closing
peer.action('disconnect')
peer.disconnect()
this.#emitPeers()
}

Expand Down Expand Up @@ -348,7 +402,8 @@ export class MapeoRPC extends TypedEmitter {
}
case 'DeviceInfo': {
const deviceInfo = DeviceInfo.decode(value)
this.emit('device-info', { ...deviceInfo, deviceId: peerId })
peer.receiveDeviceInfo(deviceInfo)
this.#emitPeers()
break
}
/* c8 ignore next 5 */
Expand Down
Loading