diff --git a/src/discovery/local-discovery.js b/src/discovery/local-discovery.js index 33ec0863c..6c1ade7bb 100644 --- a/src/discovery/local-discovery.js +++ b/src/discovery/local-discovery.js @@ -10,12 +10,13 @@ import pTimeout from 'p-timeout' import { keyToPublicId } from '@mapeo/crypto' /** @typedef {{ publicKey: Buffer, secretKey: Buffer }} Keypair */ +/** @typedef {import('../utils.js').OpenedNoiseStream} OpenedNoiseStream */ export const ERR_DUPLICATE = 'Duplicate connection' /** * @typedef {Object} DiscoveryEvents - * @property {(connection: import('@hyperswarm/secret-stream')) => void} connection + * @property {(connection: OpenedNoiseStream) => void} connection */ /** @@ -24,7 +25,7 @@ export const ERR_DUPLICATE = 'Duplicate connection' export class LocalDiscovery extends TypedEmitter { #identityKeypair #server - /** @type {Map>} */ + /** @type {Map} */ #noiseConnections = new Map() #dnssd #sm @@ -142,14 +143,18 @@ export class LocalDiscovery extends TypedEmitter { // Further errors will be handled in #handleNoiseStreamConnection() socket.off('error', onSocketError) secretStream.off('error', this.#handleSocketError) - this.#handleNoiseStreamConnection(secretStream) + this.#handleNoiseStreamConnection( + // We know the NoiseStream is open at this point, so we can coerce the type + /** @type {OpenedNoiseStream} */ + (secretStream) + ) }) } /** * - * @param {NoiseSecretStream} existing - * @param {NoiseSecretStream} keeping + * @param {OpenedNoiseStream} existing + * @param {OpenedNoiseStream} keeping */ #handleConnectionSwap(existing, keeping) { let closed = false @@ -174,7 +179,7 @@ export class LocalDiscovery extends TypedEmitter { /** * - * @param {NoiseSecretStream} conn + * @param {OpenedNoiseStream} conn * @returns */ #handleNoiseStreamConnection(conn) { diff --git a/src/invite-api.js b/src/invite-api.js index cb871d0aa..03dd72970 100644 --- a/src/invite-api.js +++ b/src/invite-api.js @@ -47,7 +47,7 @@ export class InviteApi extends TypedEmitter { /** * @param {Object} options - * @param {import('./rpc/index.js').MapeoRPC} options.rpc + * @param {import('./local-peers.js').LocalPeers} options.rpc * @param {object} options.queries * @param {(projectId: string) => Promise} options.queries.isMember * @param {(invite: import('./generated/rpc.js').Invite) => Promise} options.queries.addProject diff --git a/src/rpc/index.js b/src/local-peers.js similarity index 74% rename from src/rpc/index.js rename to src/local-peers.js index f3551c104..5989a5bd8 100644 --- a/src/rpc/index.js +++ b/src/local-peers.js @@ -1,14 +1,14 @@ // @ts-check import { TypedEmitter } from 'tiny-typed-emitter' import Protomux from 'protomux' -import { openedNoiseSecretStream, keyToId } from '../utils.js' +import { openedNoiseSecretStream, keyToId } from './utils.js' import cenc from 'compact-encoding' import { DeviceInfo, Invite, InviteResponse, InviteResponse_Decision, -} from '../generated/rpc.js' +} from './generated/rpc.js' import pDefer from 'p-defer' const PROTOCOL_NAME = 'mapeo/rpc' @@ -16,7 +16,7 @@ const PROTOCOL_NAME = 'mapeo/rpc' // Protomux message types depend on the order that messages are added to a // channel (this needs to remain consistent). To avoid breaking changes, the // types here should not change. -/** @satisfies {{ [k in keyof typeof import('../generated/rpc.js')]?: number }} */ +/** @satisfies {{ [k in keyof typeof import('./generated/rpc.js')]?: number }} */ const MESSAGE_TYPES = { Invite: 0, InviteResponse: 1, @@ -24,10 +24,19 @@ const MESSAGE_TYPES = { } const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)]) -/** @typedef {Peer['info']} PeerInfoInternal */ -/** @typedef {Omit & { status: Exclude }} PeerInfo */ -/** @typedef {'connecting' | 'connected' | 'disconnected'} PeerState */ -/** @typedef {import('type-fest').SetNonNullable} InviteWithKeys */ +/** + * @typedef {object} PeerInfoBase + * @property {string} deviceId + * @property {string | undefined} name + */ +/** @typedef {PeerInfoBase & { status: 'connecting' }} PeerInfoConnecting */ +/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux }} PeerInfoConnected */ +/** @typedef {PeerInfoBase & { status: 'disconnected', disconnectedAt: number }} PeerInfoDisconnected */ + +/** @typedef {PeerInfoConnecting | PeerInfoConnected | PeerInfoDisconnected} PeerInfoInternal */ +/** @typedef {PeerInfoConnected | PeerInfoDisconnected} PeerInfo */ +/** @typedef {PeerInfoInternal['status']} PeerState */ +/** @typedef {import('type-fest').SetNonNullable} InviteWithKeys */ /** * @template ValueType @@ -44,6 +53,12 @@ class Peer { #connected /** @type {Map>>} */ pendingInvites = new Map() + /** @type {string | undefined} */ + #name + #connectedAt = 0 + #disconnectedAt = 0 + /** @type {Protomux} */ + #protomux /** * @param {object} options @@ -55,40 +70,65 @@ class Peer { this.#channel = channel this.#connected = pDefer() } + /** @returns {PeerInfoInternal} */ get info() { - return { - status: this.#state, - id: keyToId(this.#publicKey), - } - } - /** - * Poor-man's finite state machine. Rather than a `setState` method, only - * allows specific transitions between states. - * - * @param {'connect' | 'disconnect'} type - */ - action(type) { - switch (type) { - case 'connect': - /* c8 ignore next 3 */ - if (this.#state !== 'connecting') { - return // TODO: report error - this should not happen + const deviceId = keyToId(this.#publicKey) + switch (this.#state) { + case 'connecting': + return { + status: this.#state, + deviceId, + name: this.#name, } - this.#state = 'connected' - this.#connected.resolve() - break - case 'disconnect': - /* c8 ignore next */ - if (this.#state === 'disconnected') return - this.#state = 'disconnected' - for (const pending of this.pendingInvites.values()) { - for (const { reject } of pending) { - reject(new PeerDisconnectedError()) - } + case 'connected': + return { + status: this.#state, + deviceId, + name: this.#name, + connectedAt: this.#connectedAt, + protomux: this.#protomux, } - this.pendingInvites.clear() - break + case 'disconnected': + return { + status: this.#state, + deviceId, + name: this.#name, + disconnectedAt: this.#disconnectedAt, + } + /* c8 ignore next 4 */ + default: { + /** @type {never} */ + const _exhaustiveCheck = this.#state + return _exhaustiveCheck + } + } + } + /** @param {Protomux} protomux */ + connect(protomux) { + this.#protomux = protomux + /* c8 ignore next 3 */ + if (this.#state !== 'connecting') { + return // TODO: report error - this should not happen } + this.#state = 'connected' + this.#connectedAt = Date.now() + this.#connected.resolve() + } + disconnect() { + // @ts-ignore - easier to ignore this than handle this for TS - avoids holding a reference to old Protomux instances + this.#protomux = undefined + /* c8 ignore next */ + if (this.#state === 'disconnected') return + this.#state = 'disconnected' + this.#disconnectedAt = Date.now() + // Can just resolve this rather than reject, because #assertConnected will throw the error + this.#connected.resolve() + for (const pending of this.pendingInvites.values()) { + for (const { reject } of pending) { + reject(new PeerDisconnectedError()) + } + } + this.pendingInvites.clear() } /** @param {InviteWithKeys} invite */ async sendInvite(invite) { @@ -111,6 +151,10 @@ class Peer { const messageType = MESSAGE_TYPES.DeviceInfo this.#channel.messages[messageType].send(buf) } + /** @param {DeviceInfo} deviceInfo */ + receiveDeviceInfo(deviceInfo) { + this.#name = deviceInfo.name + } async #assertConnected() { await this.#connected.promise if (this.#state === 'connected' && !this.#channel.closed) return @@ -120,23 +164,18 @@ class Peer { } /** - * @typedef {object} MapeoRPCEvents + * @typedef {object} LocalPeersEvents * @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status * @property {(peerId: string, invite: InviteWithKeys) => void} invite Emitted when an invite is received - * @property {(deviceInfo: DeviceInfo & { deviceId: string }) => void} device-info Emitted when we receive device info for a device */ -/** @extends {TypedEmitter} */ -export class MapeoRPC extends TypedEmitter { +/** @extends {TypedEmitter} */ +export class LocalPeers extends TypedEmitter { /** @type {Map} */ #peers = new Map() /** @type {Set>} */ #opening = new Set() - constructor() { - super() - } - static InviteResponse = InviteResponse_Decision /** @@ -221,7 +260,8 @@ export class MapeoRPC extends TypedEmitter { /** * Connect to a peer over an existing NoiseSecretStream * - * @param {import('../types.js').NoiseStream | import('../types.js').ProtocolStream} stream a NoiseSecretStream from @hyperswarm/secret-stream + * @param {import('./types.js').NoiseStream} stream a NoiseSecretStream from @hyperswarm/secret-stream + * @returns {import('./types.js').ReplicationStream} */ connect(stream) { if (!stream.noiseStream) throw new Error('Invalid stream') @@ -229,8 +269,12 @@ export class MapeoRPC extends TypedEmitter { stream.userData && Protomux.isProtomux(stream.userData) ? stream.userData : Protomux.from(stream) + stream.userData = protomux this.#opening.add(stream.opened) + // No need to connect error handler to stream because Protomux does this, + // and errors are eventually handled by #closePeer + // noiseSecretStream.remotePublicKey can be null before the stream has // opened, so this helped awaits the open openedNoiseSecretStream(stream).then((stream) => { @@ -254,7 +298,7 @@ export class MapeoRPC extends TypedEmitter { userData: null, protocol: PROTOCOL_NAME, messages, - onopen: this.#openPeer.bind(this, remotePublicKey), + onopen: this.#openPeer.bind(this, remotePublicKey, protomux), onclose: this.#closePeer.bind(this, remotePublicKey), }) channel.open() @@ -263,27 +307,28 @@ export class MapeoRPC extends TypedEmitter { const existingPeer = this.#peers.get(peerId) /* c8 ignore next 3 */ if (existingPeer && existingPeer.info.status !== 'disconnected') { - existingPeer.action('disconnect') // Should not happen, but in case + existingPeer.disconnect() // Should not happen, but in case } const peer = new Peer({ publicKey: remotePublicKey, channel }) this.#peers.set(peerId, peer) // Do not emit peers now - will emit when connected }) - return stream + return stream.rawStream } - /** @param {Buffer} publicKey */ - #openPeer(publicKey) { + /** + * @param {Buffer} publicKey + * @param {Protomux} protomux + */ + #openPeer(publicKey, protomux) { const peerId = keyToId(publicKey) const peer = this.#peers.get(peerId) /* c8 ignore next */ if (!peer) return // TODO: report error - this should not happen - // No-op if no change in state - /* c8 ignore next */ - if (peer.info.status === 'connected') return // TODO: report error - this should not happen - peer.action('connect') - this.#emitPeers() + const wasConnected = peer.info.status === 'connected' + peer.connect(protomux) + if (!wasConnected) this.#emitPeers() } /** @param {Buffer} publicKey */ @@ -296,7 +341,7 @@ export class MapeoRPC extends TypedEmitter { /* c8 ignore next */ if (peer.info.status === 'disconnected') return // TODO: Track reasons for closing - peer.action('disconnect') + peer.disconnect() this.#emitPeers() } @@ -348,7 +393,8 @@ export class MapeoRPC extends TypedEmitter { } case 'DeviceInfo': { const deviceInfo = DeviceInfo.decode(value) - this.emit('device-info', { ...deviceInfo, deviceId: peerId }) + peer.receiveDeviceInfo(deviceInfo) + this.#emitPeers() break } /* c8 ignore next 5 */ diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index 23842de7e..32965a237 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -22,7 +22,7 @@ import { projectKeyToPublicId, } from './utils.js' import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js' -import { MapeoRPC } from './rpc/index.js' +import { LocalPeers } from './local-peers.js' import { InviteApi } from './invite-api.js' /** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */ @@ -69,7 +69,7 @@ export class MapeoManager { migrationsFolder: new URL('../drizzle/client', import.meta.url).pathname, }) - this.#rpc = new MapeoRPC() + this.#rpc = new LocalPeers() this.#keyManager = new KeyManager(rootKey) this.#deviceId = getDeviceId(this.#keyManager) this.#projectSettingsIndexWriter = new IndexWriter({ diff --git a/src/mapeo-project.js b/src/mapeo-project.js index dd5bd3992..ed24cbc50 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -67,7 +67,7 @@ export class MapeoProject { * @param {import('drizzle-orm/better-sqlite3').BetterSQLite3Database} opts.sharedDb * @param {IndexWriter} opts.sharedIndexWriter * @param {import('./types.js').CoreStorage} opts.coreStorage Folder to store all hypercore data - * @param {import('./rpc/index.js').MapeoRPC} opts.rpc + * @param {import('./local-peers.js').LocalPeers} opts.rpc * */ constructor({ diff --git a/src/member-api.js b/src/member-api.js index d79b22016..1d2f490d2 100644 --- a/src/member-api.js +++ b/src/member-api.js @@ -22,7 +22,7 @@ export class MemberApi extends TypedEmitter { * @param {import('./core-ownership.js').CoreOwnership} opts.coreOwnership * @param {import('./generated/keys.js').EncryptionKeys} opts.encryptionKeys * @param {Buffer} opts.projectKey - * @param {import('./rpc/index.js').MapeoRPC} opts.rpc + * @param {import('./local-peers.js').LocalPeers} opts.rpc * @param {Object} opts.dataTypes * @param {Pick} opts.dataTypes.deviceInfo * @param {Pick} opts.dataTypes.project diff --git a/src/utils.js b/src/utils.js index 92c0568f6..07fc064ea 100644 --- a/src/utils.js +++ b/src/utils.js @@ -46,9 +46,12 @@ export function truncateId(keyOrId, length = 3) { return keyToId(keyOrId).slice(0, length) } -/** @typedef {import('@hyperswarm/secret-stream')} NoiseStream */ +/** @typedef {import('@hyperswarm/secret-stream')} NoiseStream */ /** @typedef {NoiseStream & { destroyed: true }} DestroyedNoiseStream */ -/** @typedef {NoiseStream & { publicKey: Buffer, remotePublicKey: Buffer, handshake: Buffer }} OpenedNoiseStream */ +/** + * @template {import('node:stream').Duplex | import('streamx').Duplex} [T=import('node:stream').Duplex | import('streamx').Duplex] + * @typedef {import('@hyperswarm/secret-stream') & { publicKey: Buffer, remotePublicKey: Buffer, handshake: Buffer }} OpenedNoiseStream + */ /** * Utility to await a NoiseSecretStream to open, that returns a stream with the diff --git a/test-e2e/manager-invite.js b/test-e2e/manager-invite.js index 437ba4297..4a0b866ce 100644 --- a/test-e2e/manager-invite.js +++ b/test-e2e/manager-invite.js @@ -5,7 +5,7 @@ import RAM from 'random-access-memory' import { MEMBER_ROLE_ID } from '../src/capabilities.js' import { InviteResponse_Decision } from '../src/generated/rpc.js' import { MapeoManager, kRPC } from '../src/mapeo-manager.js' -import { replicate } from '../tests/helpers/rpc.js' +import { replicate } from '../tests/helpers/local-peers.js' test('member invite accepted', async (t) => { t.plan(10) @@ -25,7 +25,7 @@ test('member invite accepted', async (t) => { creator[kRPC].on('peers', async (peers) => { t.is(peers.length, 1) - const response = await creatorProject.$member.invite(peers[0].id, { + const response = await creatorProject.$member.invite(peers[0].deviceId, { roleId: MEMBER_ROLE_ID, }) @@ -52,7 +52,7 @@ test('member invite accepted', async (t) => { joiner[kRPC].on('peers', (peers) => { t.is(peers.length, 1) - expectedInvitorPeerId = peers[0].id + expectedInvitorPeerId = peers[0].deviceId }) joiner.invite.on('invite-received', async (invite) => { @@ -119,7 +119,7 @@ test('member invite rejected', async (t) => { creator[kRPC].on('peers', async (peers) => { t.is(peers.length, 1) - const response = await creatorProject.$member.invite(peers[0].id, { + const response = await creatorProject.$member.invite(peers[0].deviceId, { roleId: MEMBER_ROLE_ID, }) @@ -146,7 +146,7 @@ test('member invite rejected', async (t) => { joiner[kRPC].on('peers', (peers) => { t.is(peers.length, 1) - expectedInvitorPeerId = peers[0].id + expectedInvitorPeerId = peers[0].deviceId }) joiner.invite.on('invite-received', async (invite) => { diff --git a/test-e2e/members.js b/test-e2e/members.js index cbef3fef8..64c28dee9 100644 --- a/test-e2e/members.js +++ b/test-e2e/members.js @@ -11,7 +11,7 @@ import { MEMBER_ROLE_ID, NO_ROLE_CAPABILITIES, } from '../src/capabilities.js' -import { replicate } from '../tests/helpers/rpc.js' +import { replicate } from '../tests/helpers/local-peers.js' test('getting yourself after creating project', async (t) => { const { manager } = setup() @@ -191,7 +191,7 @@ function setup() { }) manager[kRPC].on('peers', (peers) => { - const deviceId = peers[0].id + const deviceId = peers[0].deviceId project.$member .invite(deviceId, { roleId }) .then(() => deferred.resolve(deviceId)) diff --git a/test-types/data-types.ts b/test-types/data-types.ts index f93833ad4..10b2fcb15 100644 --- a/test-types/data-types.ts +++ b/test-types/data-types.ts @@ -14,7 +14,7 @@ import { drizzle } from 'drizzle-orm/better-sqlite3' import RAM from 'random-access-memory' import { IndexWriter } from '../dist/index-writer/index.js' import { projectSettingsTable } from '../dist/schema/client.js' -import { MapeoRPC } from '../dist/rpc/index.js' +import { LocalPeers } from '../dist/local-peers.js' import { Expect, type Equal } from './utils.js' type Forks = { forks: string[] } @@ -36,7 +36,7 @@ const mapeoProject = new MapeoProject({ tables: [projectSettingsTable], sqlite, }), - rpc: new MapeoRPC(), + rpc: new LocalPeers(), }) ///// Observations diff --git a/tests/helpers/rpc.js b/tests/helpers/local-peers.js similarity index 78% rename from tests/helpers/rpc.js rename to tests/helpers/local-peers.js index 6858e1f1f..1b7a8ce84 100644 --- a/tests/helpers/rpc.js +++ b/tests/helpers/local-peers.js @@ -5,10 +5,9 @@ import NoiseSecretStream from '@hyperswarm/secret-stream' */ /** - * @param {import('../../src/rpc/index.js').MapeoRPC} rpc1 - * @param {import('../../src/rpc/index.js').MapeoRPC} rpc2 + * @param {import('../../src/local-peers.js').LocalPeers} rpc1 + * @param {import('../../src/local-peers.js').LocalPeers} rpc2 * @param { {kp1?: KeyPair, kp2?: KeyPair} } [keyPairs] - * @returns {() => Promise<[void, void]>} */ export function replicate( rpc1, @@ -29,25 +28,24 @@ export function replicate( // @ts-expect-error n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) - // @ts-expect-error rpc1.connect(n1) - // @ts-expect-error rpc2.connect(n2) - return async function destroy() { + /** @param {Error} [e] */ + return async function destroy(e) { return Promise.all([ /** @type {Promise} */ ( new Promise((res) => { n1.on('close', res) - n1.destroy() + n1.destroy(e) }) ), /** @type {Promise} */ ( new Promise((res) => { n2.on('close', res) - n2.destroy() + n2.destroy(e) }) ), ]) diff --git a/tests/invite-api.js b/tests/invite-api.js index de24e127a..010a955fc 100644 --- a/tests/invite-api.js +++ b/tests/invite-api.js @@ -1,10 +1,10 @@ import test from 'brittle' import { randomBytes } from 'crypto' import { KeyManager } from '@mapeo/crypto' -import { MapeoRPC } from '../src/rpc/index.js' +import { LocalPeers } from '../src/local-peers.js' import { InviteApi } from '../src/invite-api.js' import { projectKeyToPublicId } from '../src/utils.js' -import { replicate } from './helpers/rpc.js' +import { replicate } from './helpers/local-peers.js' import NoiseSecretStream from '@hyperswarm/secret-stream' import pDefer from 'p-defer' @@ -15,7 +15,7 @@ test('invite-received event has expected payload', async (t) => { const projects = new Map() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -33,13 +33,13 @@ test('invite-received event has expected payload', async (t) => { r2.on('peers', (peers) => { t.is(peers.length, 1) - expectedInvitorPeerId = peers[0].id + expectedInvitorPeerId = peers[0].deviceId }) r1.on('peers', (peers) => { t.is(peers.length, 1) - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, projectInfo: { name: 'Mapeo' }, @@ -65,7 +65,7 @@ test('Accept invite', async (t) => { const projects = new Map() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -83,12 +83,12 @@ test('Accept invite', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) - t.is(response, MapeoRPC.InviteResponse.ACCEPT) + t.is(response, LocalPeers.InviteResponse.ACCEPT) }) inviteApi.on('invite-received', async ({ projectId }) => { @@ -109,7 +109,7 @@ test('Reject invite', async (t) => { const projects = new Map() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -127,12 +127,12 @@ test('Reject invite', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) - t.is(response, MapeoRPC.InviteResponse.REJECT) + t.is(response, LocalPeers.InviteResponse.REJECT) }) inviteApi.on('invite-received', async ({ projectId }) => { @@ -152,7 +152,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -169,14 +169,14 @@ test('Receiving invite for project that peer already belongs to', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) t.is( response, - MapeoRPC.InviteResponse.ALREADY, + LocalPeers.InviteResponse.ALREADY, 'invited peer automatically responds with "ALREADY"' ) }) @@ -195,7 +195,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() let isMember = false const inviteApi = new InviteApi({ @@ -213,14 +213,14 @@ test('Receiving invite for project that peer already belongs to', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) t.is( response, - MapeoRPC.InviteResponse.ALREADY, + LocalPeers.InviteResponse.ALREADY, 'invited peer automatically responds with "ALREADY"' ) }) @@ -242,7 +242,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { const projects = new Map() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -257,19 +257,19 @@ test('Receiving invite for project that peer already belongs to', async (t) => { }) r1.on('peers', async (peers) => { - const response1 = await r1.invite(peers[0].id, { + const response1 = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) - t.is(response1, MapeoRPC.InviteResponse.ACCEPT) + t.is(response1, LocalPeers.InviteResponse.ACCEPT) - const response2 = await r1.invite(peers[0].id, { + const response2 = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) - t.is(response2, MapeoRPC.InviteResponse.ALREADY) + t.is(response2, LocalPeers.InviteResponse.ALREADY) }) let inviteReceivedEventCount = 0 @@ -286,7 +286,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { }) test('trying to accept or reject non-existent invite throws', async (t) => { - const rpc = new MapeoRPC() + const rpc = new LocalPeers() const inviteApi = new InviteApi({ rpc, queries: { @@ -307,7 +307,7 @@ test('invitor disconnecting results in accept throwing', async (t) => { const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -322,7 +322,7 @@ test('invitor disconnecting results in accept throwing', async (t) => { r1.on('peers', async (peers) => { if (peers.length !== 1 || peers[0].status === 'disconnected') return await t.exception(() => { - return r1.invite(peers[0].id, { + return r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -345,7 +345,7 @@ test('invitor disconnecting results in invite reject response not throwing', asy const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -359,7 +359,7 @@ test('invitor disconnecting results in invite reject response not throwing', asy if (peers.length !== 1 || peers[0].status === 'disconnected') return await t.exception(() => { - return r1.invite(peers[0].id, { + return r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -381,7 +381,7 @@ test('invitor disconnecting results in invite already response not throwing', as const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() let isMember = false @@ -399,7 +399,7 @@ test('invitor disconnecting results in invite already response not throwing', as if (peers.length !== 1 || peers[0].status === 'disconnected') return await t.exception(() => { - return r1.invite(peers[0].id, { + return r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -422,7 +422,7 @@ test('addProject throwing results in invite accept throwing', async (t) => { const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -435,7 +435,7 @@ test('addProject throwing results in invite accept throwing', async (t) => { }) r1.on('peers', (peers) => { - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -455,7 +455,7 @@ test('Invite from multiple peers', async (t) => { t.plan(5 + invitorCount) const { projectKey, encryptionKeys } = setup() - const invitee = new MapeoRPC() + const invitee = new LocalPeers() const inviteeKeyPair = NoiseSecretStream.keyPair() const projects = new Map() @@ -492,19 +492,19 @@ test('Invite from multiple peers', async (t) => { }) for (let i = 0; i < invitorCount; i++) { - const invitor = new MapeoRPC() + const invitor = new LocalPeers() const keyPair = NoiseSecretStream.keyPair() invitor.on('peers', async (peers) => { if (++connected === invitorCount) deferred.resolve() - const response = await invitor.invite(peers[0].id, { + const response = await invitor.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) if (first === keyPair.publicKey.toString('hex')) { t.pass('One invitor did receive accept response') - t.is(response, MapeoRPC.InviteResponse.ACCEPT, 'accept response') + t.is(response, LocalPeers.InviteResponse.ACCEPT, 'accept response') } else { - t.is(response, MapeoRPC.InviteResponse.ALREADY, 'already response') + t.is(response, LocalPeers.InviteResponse.ALREADY, 'already response') } }) replicate(invitee, invitor, { kp1: inviteeKeyPair, kp2: keyPair }) @@ -517,7 +517,7 @@ test.skip('Invite from multiple peers, first disconnects before accepted, receiv t.plan(8 + invitorCount) const { projectKey, encryptionKeys } = setup() - const invitee = new MapeoRPC() + const invitee = new LocalPeers() const inviteeKeyPair = NoiseSecretStream.keyPair() const projects = new Map() @@ -562,22 +562,22 @@ test.skip('Invite from multiple peers, first disconnects before accepted, receiv }) for (let i = 0; i < invitorCount; i++) { - const invitor = new MapeoRPC() + const invitor = new LocalPeers() const keyPair = NoiseSecretStream.keyPair() const invitorId = keyPair.publicKey.toString('hex') invitor.on('peers', async (peers) => { if (peers[0].status !== 'connected') return if (++connected === invitorCount) deferred.resolve() try { - const response = await invitor.invite(peers[0].id, { + const response = await invitor.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) if (invitorId === invitesReceived[1]) { t.pass('One invitor did receive accept response') - t.is(response, MapeoRPC.InviteResponse.ACCEPT, 'accept response') + t.is(response, LocalPeers.InviteResponse.ACCEPT, 'accept response') } else { - t.is(response, MapeoRPC.InviteResponse.ALREADY, 'already response') + t.is(response, LocalPeers.InviteResponse.ALREADY, 'already response') } } catch (e) { t.is( @@ -598,7 +598,7 @@ test.skip('Invite from multiple peers, first disconnects before accepted, receiv function setup() { const encryptionKeys = { auth: randomBytes(32) } const projectKey = KeyManager.generateProjectKeypair().publicKey - const rpc = new MapeoRPC() + const rpc = new LocalPeers() return { rpc, diff --git a/tests/rpc.js b/tests/local-peers.js similarity index 69% rename from tests/rpc.js rename to tests/local-peers.js index 83cd90698..dc704f8a4 100644 --- a/tests/rpc.js +++ b/tests/local-peers.js @@ -1,39 +1,40 @@ // @ts-check import test from 'brittle' import { - MapeoRPC, + LocalPeers, PeerDisconnectedError, TimeoutError, UnknownPeerError, -} from '../src/rpc/index.js' +} from '../src/local-peers.js' import FakeTimers from '@sinonjs/fake-timers' import { once } from 'events' import { Duplex } from 'streamx' -import { replicate } from './helpers/rpc.js' +import { replicate } from './helpers/local-peers.js' import { randomBytes } from 'node:crypto' import NoiseSecretStream from '@hyperswarm/secret-stream' +import Protomux from 'protomux' test('Send invite and accept', async (t) => { t.plan(3) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) - t.is(response, MapeoRPC.InviteResponse.ACCEPT) + t.is(response, LocalPeers.InviteResponse.ACCEPT) }) r2.on('invite', (peerId, invite) => { t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) @@ -41,8 +42,8 @@ test('Send invite and accept', async (t) => { }) test('Send invite immediately', async (t) => { - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) @@ -62,33 +63,33 @@ test('Send invite immediately', async (t) => { r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) - t.is(await responsePromise, MapeoRPC.InviteResponse.ACCEPT) + t.is(await responsePromise, LocalPeers.InviteResponse.ACCEPT) }) test('Send invite and reject', async (t) => { t.plan(3) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) - t.is(response, MapeoRPC.InviteResponse.REJECT) + t.is(response, LocalPeers.InviteResponse.REJECT) }) r2.on('invite', (peerId, invite) => { t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.REJECT, + decision: LocalPeers.InviteResponse.REJECT, }) }) @@ -96,8 +97,8 @@ test('Send invite and reject', async (t) => { }) test('Invite to unknown peer', async (t) => { - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) const unknownPeerId = Buffer.allocUnsafe(32).fill(1).toString('hex') @@ -115,7 +116,7 @@ test('Invite to unknown peer', async (t) => { () => r2.inviteResponse(unknownPeerId, { projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }), UnknownPeerError ) @@ -123,25 +124,25 @@ test('Invite to unknown peer', async (t) => { test('Send invite and already on project', async (t) => { t.plan(3) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) - t.is(response, MapeoRPC.InviteResponse.ALREADY) + t.is(response, LocalPeers.InviteResponse.ALREADY) }) r2.on('invite', (peerId, invite) => { t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ALREADY, + decision: LocalPeers.InviteResponse.ALREADY, }) }) @@ -150,8 +151,8 @@ test('Send invite and already on project', async (t) => { test('Send invite with encryption key', async (t) => { t.plan(4) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) const encryptionKeys = { @@ -161,11 +162,11 @@ test('Send invite with encryption key', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) - t.is(response, MapeoRPC.InviteResponse.ACCEPT) + t.is(response, LocalPeers.InviteResponse.ACCEPT) }) r2.on('invite', (peerId, invite) => { @@ -177,7 +178,7 @@ test('Send invite with encryption key', async (t) => { ) r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) @@ -186,20 +187,20 @@ test('Send invite with encryption key', async (t) => { test('Send invite with project info', async (t) => { t.plan(4) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) const projectInfo = { name: 'MyProject' } r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, projectInfo, encryptionKeys: { auth: randomBytes(32) }, }) - t.is(response, MapeoRPC.InviteResponse.ACCEPT) + t.is(response, LocalPeers.InviteResponse.ACCEPT) }) r2.on('invite', (peerId, invite) => { @@ -207,7 +208,7 @@ test('Send invite with project info', async (t) => { t.alike(invite.projectInfo, projectInfo, 'project info is sent with invite') r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) @@ -216,8 +217,8 @@ test('Send invite with project info', async (t) => { test('Disconnected peer shows in state', async (t) => { t.plan(6) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() let peerStateUpdates = 0 r1.on('peers', async (peers) => { @@ -225,7 +226,7 @@ test('Disconnected peer shows in state', async (t) => { if (peers[0].status === 'connected') { t.pass('peer appeared as connected') t.is(++peerStateUpdates, 1) - destroy() + destroy(new Error()) } else { t.pass('peer appeared as disconnected') t.is(++peerStateUpdates, 2) @@ -235,16 +236,26 @@ test('Disconnected peer shows in state', async (t) => { const destroy = replicate(r1, r2) }) +test('next tick disconnect does not throw', async (t) => { + const r1 = new LocalPeers() + const r2 = new LocalPeers() + + const destroy = replicate(r1, r2) + await Promise.resolve() + destroy(new Error()) + t.pass() +}) + test('Disconnect results in rejected invite', async (t) => { t.plan(2) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) r1.on('peers', async (peers) => { if (peers[0].status === 'connected') { - const invite = r1.invite(peers[0].id, { + const invite = r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) @@ -268,9 +279,9 @@ test('Disconnect results in rejected invite', async (t) => { test('Invite to multiple peers', async (t) => { // This is catches not tracking invites by peer t.plan(2) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() - const r3 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() + const r3 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) @@ -279,7 +290,7 @@ test('Invite to multiple peers', async (t) => { t.pass('connected to two peers') const responses = await Promise.all( peers.map((peer) => - r1.invite(peer.id, { + r1.invite(peer.deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) @@ -287,7 +298,7 @@ test('Invite to multiple peers', async (t) => { ) t.alike( responses.sort(), - [MapeoRPC.InviteResponse.ACCEPT, MapeoRPC.InviteResponse.REJECT], + [LocalPeers.InviteResponse.ACCEPT, LocalPeers.InviteResponse.REJECT], 'One peer accepted, one rejected' ) }) @@ -295,14 +306,14 @@ test('Invite to multiple peers', async (t) => { r2.on('invite', (peerId, invite) => { r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) r3.on('invite', (peerId, invite) => { r3.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.REJECT, + decision: LocalPeers.InviteResponse.REJECT, }) }) @@ -314,27 +325,27 @@ test('Invite to multiple peers', async (t) => { test('Multiple invites to a peer, only one response', async (t) => { t.plan(2) let count = 0 - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) r1.on('peers', async (peers) => { const responses = await Promise.all([ - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }), - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }), - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }), ]) - const expected = Array(3).fill(MapeoRPC.InviteResponse.ACCEPT) + const expected = Array(3).fill(LocalPeers.InviteResponse.ACCEPT) t.alike(responses, expected) }) @@ -344,7 +355,7 @@ test('Multiple invites to a peer, only one response', async (t) => { t.is(count, 3) r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) @@ -356,13 +367,13 @@ test('Default: invites do not timeout', async (t) => { t.teardown(() => clock.uninstall()) t.plan(1) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) r1.once('peers', async (peers) => { - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }).then( @@ -381,20 +392,21 @@ test('Invite timeout', async (t) => { t.teardown(() => clock.uninstall()) t.plan(1) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) r1.once('peers', async (peers) => { t.exception( - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, - timeout: 5000, + timeout: 1000, encryptionKeys: { auth: randomBytes(32) }, }), TimeoutError ) + // Not working right now, because of the new async code clock.tick(5001) }) @@ -402,8 +414,8 @@ test('Invite timeout', async (t) => { }) test('Reconnect peer and send invite', async (t) => { - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) @@ -418,7 +430,7 @@ test('Reconnect peer and send invite', async (t) => { t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) @@ -426,45 +438,45 @@ test('Reconnect peer and send invite', async (t) => { const [peers] = await once(r1, 'peers') t.is(r1.peers.length, 1) t.is(peers[0].status, 'connected') - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) - t.is(response, MapeoRPC.InviteResponse.ACCEPT) + t.is(response, LocalPeers.InviteResponse.ACCEPT) }) test('invalid stream', (t) => { - const r1 = new MapeoRPC() + const r1 = new LocalPeers() const regularStream = new Duplex() - // @ts-expect-error t.exception(() => r1.connect(regularStream), 'Invalid stream') }) test('Send device info', async (t) => { - t.plan(3) - - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() /** @type {import('../src/generated/rpc.js').DeviceInfo} */ const expectedDeviceInfo = { name: 'mapeo' } r1.on('peers', async (peers) => { t.is(peers.length, 1) - r1.sendDeviceInfo(peers[0].id, expectedDeviceInfo) - }) - - r2.on('device-info', ({ deviceId, ...deviceInfo }) => { - t.ok(deviceId) - t.alike(deviceInfo, expectedDeviceInfo) + r1.sendDeviceInfo(peers[0].deviceId, expectedDeviceInfo) }) replicate(r1, r2) + + await new Promise((res) => { + r2.on('peers', (peers) => { + if (!(peers.length === 1 && peers[0].name)) return + t.is(peers[0].name, expectedDeviceInfo.name) + res(true) + }) + }) }) test('Send device info immediately', async (t) => { - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() /** @type {import('../src/generated/rpc.js').DeviceInfo} */ const expectedDeviceInfo = { name: 'mapeo' } @@ -476,16 +488,18 @@ test('Send device info immediately', async (t) => { r1.sendDeviceInfo(kp2.publicKey.toString('hex'), expectedDeviceInfo) - const [{ deviceId, ...deviceInfo }] = await once(r2, 'device-info') - t.ok(deviceId) - t.alike(deviceInfo, expectedDeviceInfo) + await new Promise((res) => { + r2.on('peers', (peers) => { + if (!(peers.length === 1 && peers[0].name)) return + t.is(peers[0].name, expectedDeviceInfo.name) + res(true) + }) + }) }) test('Reconnect peer and send device info', async (t) => { - t.plan(6) - - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() /** @type {import('../src/generated/rpc.js').DeviceInfo} */ const expectedDeviceInfo = { name: 'mapeo' } @@ -497,16 +511,23 @@ test('Reconnect peer and send device info', async (t) => { t.is(r1.peers.length, 1) t.is(r1.peers[0].status, 'disconnected') - r2.on('device-info', ({ deviceId, ...deviceInfo }) => { - t.ok(deviceId) - t.alike(deviceInfo, expectedDeviceInfo) - }) - replicate(r1, r2) - const [peers] = await once(r1, 'peers') + const [r1peers] = await once(r1, 'peers') t.is(r1.peers.length, 1) - t.is(peers[0].status, 'connected') + t.is(r1peers[0].status, 'connected') + + r1.sendDeviceInfo(r1peers[0].deviceId, expectedDeviceInfo) - r1.sendDeviceInfo(peers[0].id, expectedDeviceInfo) + const [r2Peers] = await once(r2, 'peers') + t.is(r2Peers[0].name, expectedDeviceInfo.name) +}) + +test('connected peer has protomux instance', async (t) => { + const r1 = new LocalPeers() + const r2 = new LocalPeers() + replicate(r1, r2) + const [[peer]] = await once(r1, 'peers') + t.is(peer.status, 'connected') + t.ok(Protomux.isProtomux(peer.protomux)) })