From d687f1d7d926f12457ed9d0d47a40778fb7bff89 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 26 Oct 2023 15:34:10 +0900 Subject: [PATCH 1/9] WIP initial work --- src/discovery/local-discovery.js | 17 +++++++++++------ src/mapeo-manager.js | 26 ++++++++++++++++++++++++++ src/rpc/index.js | 2 +- src/utils.js | 7 +++++-- 4 files changed, 43 insertions(+), 9 deletions(-) diff --git a/src/discovery/local-discovery.js b/src/discovery/local-discovery.js index 33ec0863c..6c1ade7bb 100644 --- a/src/discovery/local-discovery.js +++ b/src/discovery/local-discovery.js @@ -10,12 +10,13 @@ import pTimeout from 'p-timeout' import { keyToPublicId } from '@mapeo/crypto' /** @typedef {{ publicKey: Buffer, secretKey: Buffer }} Keypair */ +/** @typedef {import('../utils.js').OpenedNoiseStream} OpenedNoiseStream */ export const ERR_DUPLICATE = 'Duplicate connection' /** * @typedef {Object} DiscoveryEvents - * @property {(connection: import('@hyperswarm/secret-stream')) => void} connection + * @property {(connection: OpenedNoiseStream) => void} connection */ /** @@ -24,7 +25,7 @@ export const ERR_DUPLICATE = 'Duplicate connection' export class LocalDiscovery extends TypedEmitter { #identityKeypair #server - /** @type {Map>} */ + /** @type {Map} */ #noiseConnections = new Map() #dnssd #sm @@ -142,14 +143,18 @@ export class LocalDiscovery extends TypedEmitter { // Further errors will be handled in #handleNoiseStreamConnection() socket.off('error', onSocketError) secretStream.off('error', this.#handleSocketError) - this.#handleNoiseStreamConnection(secretStream) + this.#handleNoiseStreamConnection( + // We know the NoiseStream is open at this point, so we can coerce the type + /** @type {OpenedNoiseStream} */ + (secretStream) + ) }) } /** * - * @param {NoiseSecretStream} existing - * @param {NoiseSecretStream} keeping + * @param {OpenedNoiseStream} existing + * @param {OpenedNoiseStream} keeping */ #handleConnectionSwap(existing, keeping) { let closed = false @@ -174,7 +179,7 @@ export class LocalDiscovery extends TypedEmitter { /** * - * @param {NoiseSecretStream} conn + * @param {OpenedNoiseStream} conn * @returns */ #handleNoiseStreamConnection(conn) { diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index 23842de7e..35adcadb2 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -17,6 +17,7 @@ import { ProjectKeys } from './generated/keys.js' import { deNullify, getDeviceId, + keyToId, projectIdToNonce, projectKeyToId, projectKeyToPublicId, @@ -24,6 +25,7 @@ import { import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js' import { MapeoRPC } from './rpc/index.js' import { InviteApi } from './invite-api.js' +import { LocalDiscovery } from './discovery/local-discovery.js' /** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */ @@ -50,6 +52,7 @@ export class MapeoManager { #deviceId #rpc #invite + #localDiscovery /** * @param {Object} opts @@ -103,6 +106,17 @@ export class MapeoManager { } else { this.#coreStorage = coreStorage } + + this.#localDiscovery = new LocalDiscovery({ + identityKeypair: this.#keyManager.getIdentityKeypair(), + }) + + this.#localDiscovery.on('connection', (connection) => { + this.#handleDiscoveryConnection(connection).catch((e) => { + // Ignore errors here for now + console.error('Error handling discovery connection', e) + }) + }) } /** @@ -382,6 +396,18 @@ export class MapeoManager { return projectPublicId } + /** + * @param {import('./discovery/local-discovery.js').OpenedNoiseStream} connection + */ + async #handleDiscoveryConnection(connection) { + const peerId = keyToId(connection.remotePublicKey) + this.#rpc.connect(connection) + const { name } = await this.getDeviceInfo() + if (name) { + this.#rpc.sendDeviceInfo(peerId, { name }) + } + } + /** * @template {import('type-fest').Exact} T * @param {T} deviceInfo diff --git a/src/rpc/index.js b/src/rpc/index.js index f3551c104..8bf4fc859 100644 --- a/src/rpc/index.js +++ b/src/rpc/index.js @@ -221,7 +221,7 @@ export class MapeoRPC extends TypedEmitter { /** * Connect to a peer over an existing NoiseSecretStream * - * @param {import('../types.js').NoiseStream | import('../types.js').ProtocolStream} stream a NoiseSecretStream from @hyperswarm/secret-stream + * @param {import('../types.js').NoiseStream | import('../types.js').ProtocolStream} stream a NoiseSecretStream from @hyperswarm/secret-stream */ connect(stream) { if (!stream.noiseStream) throw new Error('Invalid stream') diff --git a/src/utils.js b/src/utils.js index 92c0568f6..07fc064ea 100644 --- a/src/utils.js +++ b/src/utils.js @@ -46,9 +46,12 @@ export function truncateId(keyOrId, length = 3) { return keyToId(keyOrId).slice(0, length) } -/** @typedef {import('@hyperswarm/secret-stream')} NoiseStream */ +/** @typedef {import('@hyperswarm/secret-stream')} NoiseStream */ /** @typedef {NoiseStream & { destroyed: true }} DestroyedNoiseStream */ -/** @typedef {NoiseStream & { publicKey: Buffer, remotePublicKey: Buffer, handshake: Buffer }} OpenedNoiseStream */ +/** + * @template {import('node:stream').Duplex | import('streamx').Duplex} [T=import('node:stream').Duplex | import('streamx').Duplex] + * @typedef {import('@hyperswarm/secret-stream') & { publicKey: Buffer, remotePublicKey: Buffer, handshake: Buffer }} OpenedNoiseStream + */ /** * Utility to await a NoiseSecretStream to open, that returns a stream with the From 74a77e2787e7f64d6e9ec2700e9791097ac7f2b8 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 26 Oct 2023 15:38:25 +0900 Subject: [PATCH 2/9] rename Rpc to LocalPeers --- src/invite-api.js | 2 +- src/mapeo-manager.js | 4 +- src/mapeo-project.js | 2 +- src/member-api.js | 2 +- src/rpc/index.js | 2 +- test-types/data-types.ts | 4 +- tests/helpers/rpc.js | 6 +- tests/invite-api.js | 54 +++++++++--------- tests/rpc.js | 115 +++++++++++++++++++-------------------- 9 files changed, 94 insertions(+), 97 deletions(-) diff --git a/src/invite-api.js b/src/invite-api.js index cb871d0aa..630d1f42b 100644 --- a/src/invite-api.js +++ b/src/invite-api.js @@ -47,7 +47,7 @@ export class InviteApi extends TypedEmitter { /** * @param {Object} options - * @param {import('./rpc/index.js').MapeoRPC} options.rpc + * @param {import('./rpc/index.js').LocalPeers} options.rpc * @param {object} options.queries * @param {(projectId: string) => Promise} options.queries.isMember * @param {(invite: import('./generated/rpc.js').Invite) => Promise} options.queries.addProject diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index 35adcadb2..b44128b8e 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -23,7 +23,7 @@ import { projectKeyToPublicId, } from './utils.js' import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js' -import { MapeoRPC } from './rpc/index.js' +import { LocalPeers } from './rpc/index.js' import { InviteApi } from './invite-api.js' import { LocalDiscovery } from './discovery/local-discovery.js' @@ -72,7 +72,7 @@ export class MapeoManager { migrationsFolder: new URL('../drizzle/client', import.meta.url).pathname, }) - this.#rpc = new MapeoRPC() + this.#rpc = new LocalPeers() this.#keyManager = new KeyManager(rootKey) this.#deviceId = getDeviceId(this.#keyManager) this.#projectSettingsIndexWriter = new IndexWriter({ diff --git a/src/mapeo-project.js b/src/mapeo-project.js index dd5bd3992..8d6af591a 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -67,7 +67,7 @@ export class MapeoProject { * @param {import('drizzle-orm/better-sqlite3').BetterSQLite3Database} opts.sharedDb * @param {IndexWriter} opts.sharedIndexWriter * @param {import('./types.js').CoreStorage} opts.coreStorage Folder to store all hypercore data - * @param {import('./rpc/index.js').MapeoRPC} opts.rpc + * @param {import('./rpc/index.js').LocalPeers} opts.rpc * */ constructor({ diff --git a/src/member-api.js b/src/member-api.js index d79b22016..6f92659db 100644 --- a/src/member-api.js +++ b/src/member-api.js @@ -22,7 +22,7 @@ export class MemberApi extends TypedEmitter { * @param {import('./core-ownership.js').CoreOwnership} opts.coreOwnership * @param {import('./generated/keys.js').EncryptionKeys} opts.encryptionKeys * @param {Buffer} opts.projectKey - * @param {import('./rpc/index.js').MapeoRPC} opts.rpc + * @param {import('./rpc/index.js').LocalPeers} opts.rpc * @param {Object} opts.dataTypes * @param {Pick} opts.dataTypes.deviceInfo * @param {Pick} opts.dataTypes.project diff --git a/src/rpc/index.js b/src/rpc/index.js index 8bf4fc859..29e2deac0 100644 --- a/src/rpc/index.js +++ b/src/rpc/index.js @@ -127,7 +127,7 @@ class Peer { */ /** @extends {TypedEmitter} */ -export class MapeoRPC extends TypedEmitter { +export class LocalPeers extends TypedEmitter { /** @type {Map} */ #peers = new Map() /** @type {Set>} */ diff --git a/test-types/data-types.ts b/test-types/data-types.ts index f93833ad4..74cef62a9 100644 --- a/test-types/data-types.ts +++ b/test-types/data-types.ts @@ -14,7 +14,7 @@ import { drizzle } from 'drizzle-orm/better-sqlite3' import RAM from 'random-access-memory' import { IndexWriter } from '../dist/index-writer/index.js' import { projectSettingsTable } from '../dist/schema/client.js' -import { MapeoRPC } from '../dist/rpc/index.js' +import { LocalPeers } from '../dist/rpc/index.js' import { Expect, type Equal } from './utils.js' type Forks = { forks: string[] } @@ -36,7 +36,7 @@ const mapeoProject = new MapeoProject({ tables: [projectSettingsTable], sqlite, }), - rpc: new MapeoRPC(), + rpc: new LocalPeers(), }) ///// Observations diff --git a/tests/helpers/rpc.js b/tests/helpers/rpc.js index 6858e1f1f..c735096e5 100644 --- a/tests/helpers/rpc.js +++ b/tests/helpers/rpc.js @@ -5,8 +5,8 @@ import NoiseSecretStream from '@hyperswarm/secret-stream' */ /** - * @param {import('../../src/rpc/index.js').MapeoRPC} rpc1 - * @param {import('../../src/rpc/index.js').MapeoRPC} rpc2 + * @param {import('../../src/rpc/index.js').LocalPeers} rpc1 + * @param {import('../../src/rpc/index.js').LocalPeers} rpc2 * @param { {kp1?: KeyPair, kp2?: KeyPair} } [keyPairs] * @returns {() => Promise<[void, void]>} */ @@ -29,9 +29,7 @@ export function replicate( // @ts-expect-error n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) - // @ts-expect-error rpc1.connect(n1) - // @ts-expect-error rpc2.connect(n2) return async function destroy() { diff --git a/tests/invite-api.js b/tests/invite-api.js index de24e127a..19f058dbd 100644 --- a/tests/invite-api.js +++ b/tests/invite-api.js @@ -1,7 +1,7 @@ import test from 'brittle' import { randomBytes } from 'crypto' import { KeyManager } from '@mapeo/crypto' -import { MapeoRPC } from '../src/rpc/index.js' +import { LocalPeers } from '../src/rpc/index.js' import { InviteApi } from '../src/invite-api.js' import { projectKeyToPublicId } from '../src/utils.js' import { replicate } from './helpers/rpc.js' @@ -15,7 +15,7 @@ test('invite-received event has expected payload', async (t) => { const projects = new Map() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -65,7 +65,7 @@ test('Accept invite', async (t) => { const projects = new Map() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -88,7 +88,7 @@ test('Accept invite', async (t) => { encryptionKeys, }) - t.is(response, MapeoRPC.InviteResponse.ACCEPT) + t.is(response, LocalPeers.InviteResponse.ACCEPT) }) inviteApi.on('invite-received', async ({ projectId }) => { @@ -109,7 +109,7 @@ test('Reject invite', async (t) => { const projects = new Map() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -132,7 +132,7 @@ test('Reject invite', async (t) => { encryptionKeys, }) - t.is(response, MapeoRPC.InviteResponse.REJECT) + t.is(response, LocalPeers.InviteResponse.REJECT) }) inviteApi.on('invite-received', async ({ projectId }) => { @@ -152,7 +152,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -176,7 +176,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { t.is( response, - MapeoRPC.InviteResponse.ALREADY, + LocalPeers.InviteResponse.ALREADY, 'invited peer automatically responds with "ALREADY"' ) }) @@ -195,7 +195,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() let isMember = false const inviteApi = new InviteApi({ @@ -220,7 +220,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { t.is( response, - MapeoRPC.InviteResponse.ALREADY, + LocalPeers.InviteResponse.ALREADY, 'invited peer automatically responds with "ALREADY"' ) }) @@ -242,7 +242,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { const projects = new Map() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -262,14 +262,14 @@ test('Receiving invite for project that peer already belongs to', async (t) => { encryptionKeys, }) - t.is(response1, MapeoRPC.InviteResponse.ACCEPT) + t.is(response1, LocalPeers.InviteResponse.ACCEPT) const response2 = await r1.invite(peers[0].id, { projectKey, encryptionKeys, }) - t.is(response2, MapeoRPC.InviteResponse.ALREADY) + t.is(response2, LocalPeers.InviteResponse.ALREADY) }) let inviteReceivedEventCount = 0 @@ -286,7 +286,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { }) test('trying to accept or reject non-existent invite throws', async (t) => { - const rpc = new MapeoRPC() + const rpc = new LocalPeers() const inviteApi = new InviteApi({ rpc, queries: { @@ -307,7 +307,7 @@ test('invitor disconnecting results in accept throwing', async (t) => { const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -345,7 +345,7 @@ test('invitor disconnecting results in invite reject response not throwing', asy const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -381,7 +381,7 @@ test('invitor disconnecting results in invite already response not throwing', as const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() let isMember = false @@ -422,7 +422,7 @@ test('addProject throwing results in invite accept throwing', async (t) => { const { rpc: r1, projectKey, encryptionKeys } = setup() - const r2 = new MapeoRPC() + const r2 = new LocalPeers() const inviteApi = new InviteApi({ rpc: r2, @@ -455,7 +455,7 @@ test('Invite from multiple peers', async (t) => { t.plan(5 + invitorCount) const { projectKey, encryptionKeys } = setup() - const invitee = new MapeoRPC() + const invitee = new LocalPeers() const inviteeKeyPair = NoiseSecretStream.keyPair() const projects = new Map() @@ -492,7 +492,7 @@ test('Invite from multiple peers', async (t) => { }) for (let i = 0; i < invitorCount; i++) { - const invitor = new MapeoRPC() + const invitor = new LocalPeers() const keyPair = NoiseSecretStream.keyPair() invitor.on('peers', async (peers) => { if (++connected === invitorCount) deferred.resolve() @@ -502,9 +502,9 @@ test('Invite from multiple peers', async (t) => { }) if (first === keyPair.publicKey.toString('hex')) { t.pass('One invitor did receive accept response') - t.is(response, MapeoRPC.InviteResponse.ACCEPT, 'accept response') + t.is(response, LocalPeers.InviteResponse.ACCEPT, 'accept response') } else { - t.is(response, MapeoRPC.InviteResponse.ALREADY, 'already response') + t.is(response, LocalPeers.InviteResponse.ALREADY, 'already response') } }) replicate(invitee, invitor, { kp1: inviteeKeyPair, kp2: keyPair }) @@ -517,7 +517,7 @@ test.skip('Invite from multiple peers, first disconnects before accepted, receiv t.plan(8 + invitorCount) const { projectKey, encryptionKeys } = setup() - const invitee = new MapeoRPC() + const invitee = new LocalPeers() const inviteeKeyPair = NoiseSecretStream.keyPair() const projects = new Map() @@ -562,7 +562,7 @@ test.skip('Invite from multiple peers, first disconnects before accepted, receiv }) for (let i = 0; i < invitorCount; i++) { - const invitor = new MapeoRPC() + const invitor = new LocalPeers() const keyPair = NoiseSecretStream.keyPair() const invitorId = keyPair.publicKey.toString('hex') invitor.on('peers', async (peers) => { @@ -575,9 +575,9 @@ test.skip('Invite from multiple peers, first disconnects before accepted, receiv }) if (invitorId === invitesReceived[1]) { t.pass('One invitor did receive accept response') - t.is(response, MapeoRPC.InviteResponse.ACCEPT, 'accept response') + t.is(response, LocalPeers.InviteResponse.ACCEPT, 'accept response') } else { - t.is(response, MapeoRPC.InviteResponse.ALREADY, 'already response') + t.is(response, LocalPeers.InviteResponse.ALREADY, 'already response') } } catch (e) { t.is( @@ -598,7 +598,7 @@ test.skip('Invite from multiple peers, first disconnects before accepted, receiv function setup() { const encryptionKeys = { auth: randomBytes(32) } const projectKey = KeyManager.generateProjectKeypair().publicKey - const rpc = new MapeoRPC() + const rpc = new LocalPeers() return { rpc, diff --git a/tests/rpc.js b/tests/rpc.js index 83cd90698..680dc0427 100644 --- a/tests/rpc.js +++ b/tests/rpc.js @@ -1,7 +1,7 @@ // @ts-check import test from 'brittle' import { - MapeoRPC, + LocalPeers, PeerDisconnectedError, TimeoutError, UnknownPeerError, @@ -15,8 +15,8 @@ import NoiseSecretStream from '@hyperswarm/secret-stream' test('Send invite and accept', async (t) => { t.plan(3) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) @@ -26,14 +26,14 @@ test('Send invite and accept', async (t) => { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) - t.is(response, MapeoRPC.InviteResponse.ACCEPT) + t.is(response, LocalPeers.InviteResponse.ACCEPT) }) r2.on('invite', (peerId, invite) => { t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) @@ -41,8 +41,8 @@ test('Send invite and accept', async (t) => { }) test('Send invite immediately', async (t) => { - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) @@ -62,16 +62,16 @@ test('Send invite immediately', async (t) => { r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) - t.is(await responsePromise, MapeoRPC.InviteResponse.ACCEPT) + t.is(await responsePromise, LocalPeers.InviteResponse.ACCEPT) }) test('Send invite and reject', async (t) => { t.plan(3) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) @@ -81,14 +81,14 @@ test('Send invite and reject', async (t) => { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) - t.is(response, MapeoRPC.InviteResponse.REJECT) + t.is(response, LocalPeers.InviteResponse.REJECT) }) r2.on('invite', (peerId, invite) => { t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.REJECT, + decision: LocalPeers.InviteResponse.REJECT, }) }) @@ -96,8 +96,8 @@ test('Send invite and reject', async (t) => { }) test('Invite to unknown peer', async (t) => { - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) const unknownPeerId = Buffer.allocUnsafe(32).fill(1).toString('hex') @@ -115,7 +115,7 @@ test('Invite to unknown peer', async (t) => { () => r2.inviteResponse(unknownPeerId, { projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }), UnknownPeerError ) @@ -123,8 +123,8 @@ test('Invite to unknown peer', async (t) => { test('Send invite and already on project', async (t) => { t.plan(3) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) @@ -134,14 +134,14 @@ test('Send invite and already on project', async (t) => { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) - t.is(response, MapeoRPC.InviteResponse.ALREADY) + t.is(response, LocalPeers.InviteResponse.ALREADY) }) r2.on('invite', (peerId, invite) => { t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ALREADY, + decision: LocalPeers.InviteResponse.ALREADY, }) }) @@ -150,8 +150,8 @@ test('Send invite and already on project', async (t) => { test('Send invite with encryption key', async (t) => { t.plan(4) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) const encryptionKeys = { @@ -165,7 +165,7 @@ test('Send invite with encryption key', async (t) => { projectKey, encryptionKeys, }) - t.is(response, MapeoRPC.InviteResponse.ACCEPT) + t.is(response, LocalPeers.InviteResponse.ACCEPT) }) r2.on('invite', (peerId, invite) => { @@ -177,7 +177,7 @@ test('Send invite with encryption key', async (t) => { ) r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) @@ -186,8 +186,8 @@ test('Send invite with encryption key', async (t) => { test('Send invite with project info', async (t) => { t.plan(4) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) const projectInfo = { name: 'MyProject' } @@ -199,7 +199,7 @@ test('Send invite with project info', async (t) => { projectInfo, encryptionKeys: { auth: randomBytes(32) }, }) - t.is(response, MapeoRPC.InviteResponse.ACCEPT) + t.is(response, LocalPeers.InviteResponse.ACCEPT) }) r2.on('invite', (peerId, invite) => { @@ -207,7 +207,7 @@ test('Send invite with project info', async (t) => { t.alike(invite.projectInfo, projectInfo, 'project info is sent with invite') r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) @@ -216,8 +216,8 @@ test('Send invite with project info', async (t) => { test('Disconnected peer shows in state', async (t) => { t.plan(6) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() let peerStateUpdates = 0 r1.on('peers', async (peers) => { @@ -237,8 +237,8 @@ test('Disconnected peer shows in state', async (t) => { test('Disconnect results in rejected invite', async (t) => { t.plan(2) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) @@ -268,9 +268,9 @@ test('Disconnect results in rejected invite', async (t) => { test('Invite to multiple peers', async (t) => { // This is catches not tracking invites by peer t.plan(2) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() - const r3 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() + const r3 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) @@ -287,7 +287,7 @@ test('Invite to multiple peers', async (t) => { ) t.alike( responses.sort(), - [MapeoRPC.InviteResponse.ACCEPT, MapeoRPC.InviteResponse.REJECT], + [LocalPeers.InviteResponse.ACCEPT, LocalPeers.InviteResponse.REJECT], 'One peer accepted, one rejected' ) }) @@ -295,14 +295,14 @@ test('Invite to multiple peers', async (t) => { r2.on('invite', (peerId, invite) => { r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) r3.on('invite', (peerId, invite) => { r3.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.REJECT, + decision: LocalPeers.InviteResponse.REJECT, }) }) @@ -314,8 +314,8 @@ test('Invite to multiple peers', async (t) => { test('Multiple invites to a peer, only one response', async (t) => { t.plan(2) let count = 0 - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) @@ -334,7 +334,7 @@ test('Multiple invites to a peer, only one response', async (t) => { encryptionKeys: { auth: randomBytes(32) }, }), ]) - const expected = Array(3).fill(MapeoRPC.InviteResponse.ACCEPT) + const expected = Array(3).fill(LocalPeers.InviteResponse.ACCEPT) t.alike(responses, expected) }) @@ -344,7 +344,7 @@ test('Multiple invites to a peer, only one response', async (t) => { t.is(count, 3) r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) @@ -356,8 +356,8 @@ test('Default: invites do not timeout', async (t) => { t.teardown(() => clock.uninstall()) t.plan(1) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) @@ -381,8 +381,8 @@ test('Invite timeout', async (t) => { t.teardown(() => clock.uninstall()) t.plan(1) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) @@ -402,8 +402,8 @@ test('Invite timeout', async (t) => { }) test('Reconnect peer and send invite', async (t) => { - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() const projectKey = Buffer.allocUnsafe(32).fill(0) @@ -418,7 +418,7 @@ test('Reconnect peer and send invite', async (t) => { t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') r2.inviteResponse(peerId, { projectKey: invite.projectKey, - decision: MapeoRPC.InviteResponse.ACCEPT, + decision: LocalPeers.InviteResponse.ACCEPT, }) }) @@ -430,21 +430,20 @@ test('Reconnect peer and send invite', async (t) => { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) - t.is(response, MapeoRPC.InviteResponse.ACCEPT) + t.is(response, LocalPeers.InviteResponse.ACCEPT) }) test('invalid stream', (t) => { - const r1 = new MapeoRPC() + const r1 = new LocalPeers() const regularStream = new Duplex() - // @ts-expect-error t.exception(() => r1.connect(regularStream), 'Invalid stream') }) test('Send device info', async (t) => { t.plan(3) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() /** @type {import('../src/generated/rpc.js').DeviceInfo} */ const expectedDeviceInfo = { name: 'mapeo' } @@ -463,8 +462,8 @@ test('Send device info', async (t) => { }) test('Send device info immediately', async (t) => { - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() /** @type {import('../src/generated/rpc.js').DeviceInfo} */ const expectedDeviceInfo = { name: 'mapeo' } @@ -484,8 +483,8 @@ test('Send device info immediately', async (t) => { test('Reconnect peer and send device info', async (t) => { t.plan(6) - const r1 = new MapeoRPC() - const r2 = new MapeoRPC() + const r1 = new LocalPeers() + const r2 = new LocalPeers() /** @type {import('../src/generated/rpc.js').DeviceInfo} */ const expectedDeviceInfo = { name: 'mapeo' } From e301c85ed133500d8f8e01ea97f9402992111413 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 26 Oct 2023 16:39:16 +0900 Subject: [PATCH 3/9] Handle deviceInfo internally, id -> deviceId --- src/rpc/index.js | 67 ++++++++++++++++++++++++++++++------ test-e2e/manager-invite.js | 8 ++--- test-e2e/members.js | 2 +- tests/invite-api.js | 28 ++++++++-------- tests/rpc.js | 69 +++++++++++++++++++------------------- 5 files changed, 111 insertions(+), 63 deletions(-) diff --git a/src/rpc/index.js b/src/rpc/index.js index 29e2deac0..5caa9b72c 100644 --- a/src/rpc/index.js +++ b/src/rpc/index.js @@ -24,9 +24,18 @@ const MESSAGE_TYPES = { } const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)]) -/** @typedef {Peer['info']} PeerInfoInternal */ -/** @typedef {Omit & { status: Exclude }} PeerInfo */ -/** @typedef {'connecting' | 'connected' | 'disconnected'} PeerState */ +/** + * @typedef {object} PeerInfoBase + * @property {string} deviceId + * @property {string | undefined} name + */ +/** @typedef {PeerInfoBase & { status: 'connecting' }} PeerInfoConnecting */ +/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number }} PeerInfoConnected */ +/** @typedef {PeerInfoBase & { status: 'disconnected', disconnectedAt: number }} PeerInfoDisconnected */ + +/** @typedef {PeerInfoConnecting | PeerInfoConnected | PeerInfoDisconnected} PeerInfoInternal */ +/** @typedef {PeerInfoConnected | PeerInfoDisconnected} PeerInfo */ +/** @typedef {PeerInfoInternal['status']} PeerState */ /** @typedef {import('type-fest').SetNonNullable} InviteWithKeys */ /** @@ -44,6 +53,10 @@ class Peer { #connected /** @type {Map>>} */ pendingInvites = new Map() + /** @type {string | undefined} */ + #name + #connectedAt = 0 + #disconnectedAt = 0 /** * @param {object} options @@ -55,10 +68,36 @@ class Peer { this.#channel = channel this.#connected = pDefer() } + /** @returns {PeerInfoInternal} */ get info() { - return { - status: this.#state, - id: keyToId(this.#publicKey), + const deviceId = keyToId(this.#publicKey) + switch (this.#state) { + case 'connecting': + return { + status: this.#state, + deviceId, + name: this.#name, + } + case 'connected': + return { + status: this.#state, + deviceId, + name: this.#name, + connectedAt: this.#connectedAt, + } + case 'disconnected': + return { + status: this.#state, + deviceId, + name: this.#name, + disconnectedAt: this.#disconnectedAt, + } + /* c8 ignore next 4 */ + default: { + /** @type {never} */ + const _exhaustiveCheck = this.#state + return _exhaustiveCheck + } } } /** @@ -75,12 +114,16 @@ class Peer { return // TODO: report error - this should not happen } this.#state = 'connected' + this.#connectedAt = Date.now() this.#connected.resolve() break case 'disconnect': /* c8 ignore next */ if (this.#state === 'disconnected') return this.#state = 'disconnected' + this.#disconnectedAt = Date.now() + // Can just resolve this rather than reject, because #assertConnected will throw the error + this.#connected.resolve() for (const pending of this.pendingInvites.values()) { for (const { reject } of pending) { reject(new PeerDisconnectedError()) @@ -111,6 +154,10 @@ class Peer { const messageType = MESSAGE_TYPES.DeviceInfo this.#channel.messages[messageType].send(buf) } + /** @param {DeviceInfo} deviceInfo */ + receiveDeviceInfo(deviceInfo) { + this.#name = deviceInfo.name + } async #assertConnected() { await this.#connected.promise if (this.#state === 'connected' && !this.#channel.closed) return @@ -120,13 +167,12 @@ class Peer { } /** - * @typedef {object} MapeoRPCEvents + * @typedef {object} LocalPeersEvents * @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status * @property {(peerId: string, invite: InviteWithKeys) => void} invite Emitted when an invite is received - * @property {(deviceInfo: DeviceInfo & { deviceId: string }) => void} device-info Emitted when we receive device info for a device */ -/** @extends {TypedEmitter} */ +/** @extends {TypedEmitter} */ export class LocalPeers extends TypedEmitter { /** @type {Map} */ #peers = new Map() @@ -348,7 +394,8 @@ export class LocalPeers extends TypedEmitter { } case 'DeviceInfo': { const deviceInfo = DeviceInfo.decode(value) - this.emit('device-info', { ...deviceInfo, deviceId: peerId }) + peer.receiveDeviceInfo(deviceInfo) + this.#emitPeers() break } /* c8 ignore next 5 */ diff --git a/test-e2e/manager-invite.js b/test-e2e/manager-invite.js index 437ba4297..dc19b283f 100644 --- a/test-e2e/manager-invite.js +++ b/test-e2e/manager-invite.js @@ -25,7 +25,7 @@ test('member invite accepted', async (t) => { creator[kRPC].on('peers', async (peers) => { t.is(peers.length, 1) - const response = await creatorProject.$member.invite(peers[0].id, { + const response = await creatorProject.$member.invite(peers[0].deviceId, { roleId: MEMBER_ROLE_ID, }) @@ -52,7 +52,7 @@ test('member invite accepted', async (t) => { joiner[kRPC].on('peers', (peers) => { t.is(peers.length, 1) - expectedInvitorPeerId = peers[0].id + expectedInvitorPeerId = peers[0].deviceId }) joiner.invite.on('invite-received', async (invite) => { @@ -119,7 +119,7 @@ test('member invite rejected', async (t) => { creator[kRPC].on('peers', async (peers) => { t.is(peers.length, 1) - const response = await creatorProject.$member.invite(peers[0].id, { + const response = await creatorProject.$member.invite(peers[0].deviceId, { roleId: MEMBER_ROLE_ID, }) @@ -146,7 +146,7 @@ test('member invite rejected', async (t) => { joiner[kRPC].on('peers', (peers) => { t.is(peers.length, 1) - expectedInvitorPeerId = peers[0].id + expectedInvitorPeerId = peers[0].deviceId }) joiner.invite.on('invite-received', async (invite) => { diff --git a/test-e2e/members.js b/test-e2e/members.js index cbef3fef8..8fe068478 100644 --- a/test-e2e/members.js +++ b/test-e2e/members.js @@ -191,7 +191,7 @@ function setup() { }) manager[kRPC].on('peers', (peers) => { - const deviceId = peers[0].id + const deviceId = peers[0].deviceId project.$member .invite(deviceId, { roleId }) .then(() => deferred.resolve(deviceId)) diff --git a/tests/invite-api.js b/tests/invite-api.js index 19f058dbd..4ba0909fb 100644 --- a/tests/invite-api.js +++ b/tests/invite-api.js @@ -33,13 +33,13 @@ test('invite-received event has expected payload', async (t) => { r2.on('peers', (peers) => { t.is(peers.length, 1) - expectedInvitorPeerId = peers[0].id + expectedInvitorPeerId = peers[0].deviceId }) r1.on('peers', (peers) => { t.is(peers.length, 1) - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, projectInfo: { name: 'Mapeo' }, @@ -83,7 +83,7 @@ test('Accept invite', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -127,7 +127,7 @@ test('Reject invite', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -169,7 +169,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -213,7 +213,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -257,14 +257,14 @@ test('Receiving invite for project that peer already belongs to', async (t) => { }) r1.on('peers', async (peers) => { - const response1 = await r1.invite(peers[0].id, { + const response1 = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) t.is(response1, LocalPeers.InviteResponse.ACCEPT) - const response2 = await r1.invite(peers[0].id, { + const response2 = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -322,7 +322,7 @@ test('invitor disconnecting results in accept throwing', async (t) => { r1.on('peers', async (peers) => { if (peers.length !== 1 || peers[0].status === 'disconnected') return await t.exception(() => { - return r1.invite(peers[0].id, { + return r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -359,7 +359,7 @@ test('invitor disconnecting results in invite reject response not throwing', asy if (peers.length !== 1 || peers[0].status === 'disconnected') return await t.exception(() => { - return r1.invite(peers[0].id, { + return r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -399,7 +399,7 @@ test('invitor disconnecting results in invite already response not throwing', as if (peers.length !== 1 || peers[0].status === 'disconnected') return await t.exception(() => { - return r1.invite(peers[0].id, { + return r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -435,7 +435,7 @@ test('addProject throwing results in invite accept throwing', async (t) => { }) r1.on('peers', (peers) => { - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -496,7 +496,7 @@ test('Invite from multiple peers', async (t) => { const keyPair = NoiseSecretStream.keyPair() invitor.on('peers', async (peers) => { if (++connected === invitorCount) deferred.resolve() - const response = await invitor.invite(peers[0].id, { + const response = await invitor.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -569,7 +569,7 @@ test.skip('Invite from multiple peers, first disconnects before accepted, receiv if (peers[0].status !== 'connected') return if (++connected === invitorCount) deferred.resolve() try { - const response = await invitor.invite(peers[0].id, { + const response = await invitor.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) diff --git a/tests/rpc.js b/tests/rpc.js index 680dc0427..e14633686 100644 --- a/tests/rpc.js +++ b/tests/rpc.js @@ -22,7 +22,7 @@ test('Send invite and accept', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) @@ -77,7 +77,7 @@ test('Send invite and reject', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) @@ -130,7 +130,7 @@ test('Send invite and already on project', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) @@ -161,7 +161,7 @@ test('Send invite with encryption key', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys, }) @@ -194,7 +194,7 @@ test('Send invite with project info', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, projectInfo, encryptionKeys: { auth: randomBytes(32) }, @@ -244,7 +244,7 @@ test('Disconnect results in rejected invite', async (t) => { r1.on('peers', async (peers) => { if (peers[0].status === 'connected') { - const invite = r1.invite(peers[0].id, { + const invite = r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) @@ -279,7 +279,7 @@ test('Invite to multiple peers', async (t) => { t.pass('connected to two peers') const responses = await Promise.all( peers.map((peer) => - r1.invite(peer.id, { + r1.invite(peer.deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) @@ -321,15 +321,15 @@ test('Multiple invites to a peer, only one response', async (t) => { r1.on('peers', async (peers) => { const responses = await Promise.all([ - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }), - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }), - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }), @@ -362,7 +362,7 @@ test('Default: invites do not timeout', async (t) => { const projectKey = Buffer.allocUnsafe(32).fill(0) r1.once('peers', async (peers) => { - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }).then( @@ -388,7 +388,7 @@ test('Invite timeout', async (t) => { r1.once('peers', async (peers) => { t.exception( - r1.invite(peers[0].id, { + r1.invite(peers[0].deviceId, { projectKey, timeout: 5000, encryptionKeys: { auth: randomBytes(32) }, @@ -426,7 +426,7 @@ test('Reconnect peer and send invite', async (t) => { const [peers] = await once(r1, 'peers') t.is(r1.peers.length, 1) t.is(peers[0].status, 'connected') - const response = await r1.invite(peers[0].id, { + const response = await r1.invite(peers[0].deviceId, { projectKey, encryptionKeys: { auth: randomBytes(32) }, }) @@ -440,8 +440,6 @@ test('invalid stream', (t) => { }) test('Send device info', async (t) => { - t.plan(3) - const r1 = new LocalPeers() const r2 = new LocalPeers() @@ -450,15 +448,18 @@ test('Send device info', async (t) => { r1.on('peers', async (peers) => { t.is(peers.length, 1) - r1.sendDeviceInfo(peers[0].id, expectedDeviceInfo) - }) - - r2.on('device-info', ({ deviceId, ...deviceInfo }) => { - t.ok(deviceId) - t.alike(deviceInfo, expectedDeviceInfo) + r1.sendDeviceInfo(peers[0].deviceId, expectedDeviceInfo) }) replicate(r1, r2) + + await new Promise((res) => { + r2.on('peers', (peers) => { + if (!(peers.length === 1 && peers[0].name)) return + t.is(peers[0].name, expectedDeviceInfo.name) + res(true) + }) + }) }) test('Send device info immediately', async (t) => { @@ -475,14 +476,16 @@ test('Send device info immediately', async (t) => { r1.sendDeviceInfo(kp2.publicKey.toString('hex'), expectedDeviceInfo) - const [{ deviceId, ...deviceInfo }] = await once(r2, 'device-info') - t.ok(deviceId) - t.alike(deviceInfo, expectedDeviceInfo) + await new Promise((res) => { + r2.on('peers', (peers) => { + if (!(peers.length === 1 && peers[0].name)) return + t.is(peers[0].name, expectedDeviceInfo.name) + res(true) + }) + }) }) test('Reconnect peer and send device info', async (t) => { - t.plan(6) - const r1 = new LocalPeers() const r2 = new LocalPeers() @@ -496,16 +499,14 @@ test('Reconnect peer and send device info', async (t) => { t.is(r1.peers.length, 1) t.is(r1.peers[0].status, 'disconnected') - r2.on('device-info', ({ deviceId, ...deviceInfo }) => { - t.ok(deviceId) - t.alike(deviceInfo, expectedDeviceInfo) - }) - replicate(r1, r2) - const [peers] = await once(r1, 'peers') + const [r1peers] = await once(r1, 'peers') t.is(r1.peers.length, 1) - t.is(peers[0].status, 'connected') + t.is(r1peers[0].status, 'connected') + + r1.sendDeviceInfo(r1peers[0].deviceId, expectedDeviceInfo) - r1.sendDeviceInfo(peers[0].id, expectedDeviceInfo) + const [r2Peers] = await once(r2, 'peers') + t.is(r2Peers[0].name, expectedDeviceInfo.name) }) From ae371fd8f339ba8d343e41a51d7a7849a203a003 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 26 Oct 2023 17:01:43 +0900 Subject: [PATCH 4/9] Tests for stream error handling --- src/rpc/index.js | 3 +++ tests/helpers/rpc.js | 8 ++++---- tests/rpc.js | 15 +++++++++++++-- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/rpc/index.js b/src/rpc/index.js index 5caa9b72c..e1a019960 100644 --- a/src/rpc/index.js +++ b/src/rpc/index.js @@ -277,6 +277,9 @@ export class LocalPeers extends TypedEmitter { : Protomux.from(stream) this.#opening.add(stream.opened) + // No need to connect error handler to stream because Protomux does this, + // and errors are eventually handled by #closePeer + // noiseSecretStream.remotePublicKey can be null before the stream has // opened, so this helped awaits the open openedNoiseSecretStream(stream).then((stream) => { diff --git a/tests/helpers/rpc.js b/tests/helpers/rpc.js index c735096e5..0b71b0bfa 100644 --- a/tests/helpers/rpc.js +++ b/tests/helpers/rpc.js @@ -8,7 +8,6 @@ import NoiseSecretStream from '@hyperswarm/secret-stream' * @param {import('../../src/rpc/index.js').LocalPeers} rpc1 * @param {import('../../src/rpc/index.js').LocalPeers} rpc2 * @param { {kp1?: KeyPair, kp2?: KeyPair} } [keyPairs] - * @returns {() => Promise<[void, void]>} */ export function replicate( rpc1, @@ -32,20 +31,21 @@ export function replicate( rpc1.connect(n1) rpc2.connect(n2) - return async function destroy() { + /** @param {Error} [e] */ + return async function destroy(e) { return Promise.all([ /** @type {Promise} */ ( new Promise((res) => { n1.on('close', res) - n1.destroy() + n1.destroy(e) }) ), /** @type {Promise} */ ( new Promise((res) => { n2.on('close', res) - n2.destroy() + n2.destroy(e) }) ), ]) diff --git a/tests/rpc.js b/tests/rpc.js index e14633686..695d8a77a 100644 --- a/tests/rpc.js +++ b/tests/rpc.js @@ -225,7 +225,7 @@ test('Disconnected peer shows in state', async (t) => { if (peers[0].status === 'connected') { t.pass('peer appeared as connected') t.is(++peerStateUpdates, 1) - destroy() + destroy(new Error()) } else { t.pass('peer appeared as disconnected') t.is(++peerStateUpdates, 2) @@ -235,6 +235,16 @@ test('Disconnected peer shows in state', async (t) => { const destroy = replicate(r1, r2) }) +test('next tick disconnect does not throw', async (t) => { + const r1 = new LocalPeers() + const r2 = new LocalPeers() + + const destroy = replicate(r1, r2) + await Promise.resolve() + destroy(new Error()) + t.pass() +}) + test('Disconnect results in rejected invite', async (t) => { t.plan(2) const r1 = new LocalPeers() @@ -390,11 +400,12 @@ test('Invite timeout', async (t) => { t.exception( r1.invite(peers[0].deviceId, { projectKey, - timeout: 5000, + timeout: 1000, encryptionKeys: { auth: randomBytes(32) }, }), TimeoutError ) + // Not working right now, because of the new async code clock.tick(5001) }) From ccdf39f2519596e04dac31c49ff630bf052ced6e Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 26 Oct 2023 17:04:19 +0900 Subject: [PATCH 5/9] remove unnecessary constructor --- src/rpc/index.js | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/rpc/index.js b/src/rpc/index.js index e1a019960..e1c6fb4c4 100644 --- a/src/rpc/index.js +++ b/src/rpc/index.js @@ -179,10 +179,6 @@ export class LocalPeers extends TypedEmitter { /** @type {Set>} */ #opening = new Set() - constructor() { - super() - } - static InviteResponse = InviteResponse_Decision /** From a52254b08f4bf00f1b34d5f93ef6d6d5fbc9f32c Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 26 Oct 2023 17:10:54 +0900 Subject: [PATCH 6/9] return replication stream --- src/rpc/index.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/rpc/index.js b/src/rpc/index.js index e1c6fb4c4..8ec334fc0 100644 --- a/src/rpc/index.js +++ b/src/rpc/index.js @@ -263,7 +263,8 @@ export class LocalPeers extends TypedEmitter { /** * Connect to a peer over an existing NoiseSecretStream * - * @param {import('../types.js').NoiseStream | import('../types.js').ProtocolStream} stream a NoiseSecretStream from @hyperswarm/secret-stream + * @param {import('../types.js').NoiseStream} stream a NoiseSecretStream from @hyperswarm/secret-stream + * @returns {import('../types.js').ReplicationStream} */ connect(stream) { if (!stream.noiseStream) throw new Error('Invalid stream') @@ -271,6 +272,7 @@ export class LocalPeers extends TypedEmitter { stream.userData && Protomux.isProtomux(stream.userData) ? stream.userData : Protomux.from(stream) + stream.userData = protomux this.#opening.add(stream.opened) // No need to connect error handler to stream because Protomux does this, @@ -315,7 +317,7 @@ export class LocalPeers extends TypedEmitter { // Do not emit peers now - will emit when connected }) - return stream + return stream.rawStream } /** @param {Buffer} publicKey */ From 50698e6935a675ddce8136dd7f07303de0ebecb3 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 26 Oct 2023 17:34:32 +0900 Subject: [PATCH 7/9] Attach protomux instance to peer info --- src/rpc/index.js | 82 +++++++++++++++++++++++------------------------- tests/rpc.js | 10 ++++++ 2 files changed, 50 insertions(+), 42 deletions(-) diff --git a/src/rpc/index.js b/src/rpc/index.js index 8ec334fc0..273fa0ec6 100644 --- a/src/rpc/index.js +++ b/src/rpc/index.js @@ -30,7 +30,7 @@ const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)]) * @property {string | undefined} name */ /** @typedef {PeerInfoBase & { status: 'connecting' }} PeerInfoConnecting */ -/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number }} PeerInfoConnected */ +/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux }} PeerInfoConnected */ /** @typedef {PeerInfoBase & { status: 'disconnected', disconnectedAt: number }} PeerInfoDisconnected */ /** @typedef {PeerInfoConnecting | PeerInfoConnected | PeerInfoDisconnected} PeerInfoInternal */ @@ -57,6 +57,8 @@ class Peer { #name #connectedAt = 0 #disconnectedAt = 0 + /** @type {Protomux} */ + #protomux /** * @param {object} options @@ -84,6 +86,7 @@ class Peer { deviceId, name: this.#name, connectedAt: this.#connectedAt, + protomux: this.#protomux, } case 'disconnected': return { @@ -100,38 +103,32 @@ class Peer { } } } - /** - * Poor-man's finite state machine. Rather than a `setState` method, only - * allows specific transitions between states. - * - * @param {'connect' | 'disconnect'} type - */ - action(type) { - switch (type) { - case 'connect': - /* c8 ignore next 3 */ - if (this.#state !== 'connecting') { - return // TODO: report error - this should not happen - } - this.#state = 'connected' - this.#connectedAt = Date.now() - this.#connected.resolve() - break - case 'disconnect': - /* c8 ignore next */ - if (this.#state === 'disconnected') return - this.#state = 'disconnected' - this.#disconnectedAt = Date.now() - // Can just resolve this rather than reject, because #assertConnected will throw the error - this.#connected.resolve() - for (const pending of this.pendingInvites.values()) { - for (const { reject } of pending) { - reject(new PeerDisconnectedError()) - } - } - this.pendingInvites.clear() - break + /** @param {Protomux} protomux */ + connect(protomux) { + this.#protomux = protomux + /* c8 ignore next 3 */ + if (this.#state !== 'connecting') { + return // TODO: report error - this should not happen } + this.#state = 'connected' + this.#connectedAt = Date.now() + this.#connected.resolve() + } + disconnect() { + // @ts-ignore - easier to ignore this than handle this for TS - avoids holding a reference to old Protomux instances + this.#protomux = undefined + /* c8 ignore next */ + if (this.#state === 'disconnected') return + this.#state = 'disconnected' + this.#disconnectedAt = Date.now() + // Can just resolve this rather than reject, because #assertConnected will throw the error + this.#connected.resolve() + for (const pending of this.pendingInvites.values()) { + for (const { reject } of pending) { + reject(new PeerDisconnectedError()) + } + } + this.pendingInvites.clear() } /** @param {InviteWithKeys} invite */ async sendInvite(invite) { @@ -301,7 +298,7 @@ export class LocalPeers extends TypedEmitter { userData: null, protocol: PROTOCOL_NAME, messages, - onopen: this.#openPeer.bind(this, remotePublicKey), + onopen: this.#openPeer.bind(this, remotePublicKey, protomux), onclose: this.#closePeer.bind(this, remotePublicKey), }) channel.open() @@ -310,7 +307,7 @@ export class LocalPeers extends TypedEmitter { const existingPeer = this.#peers.get(peerId) /* c8 ignore next 3 */ if (existingPeer && existingPeer.info.status !== 'disconnected') { - existingPeer.action('disconnect') // Should not happen, but in case + existingPeer.disconnect() // Should not happen, but in case } const peer = new Peer({ publicKey: remotePublicKey, channel }) this.#peers.set(peerId, peer) @@ -320,17 +317,18 @@ export class LocalPeers extends TypedEmitter { return stream.rawStream } - /** @param {Buffer} publicKey */ - #openPeer(publicKey) { + /** + * @param {Buffer} publicKey + * @param {Protomux} protomux + */ + #openPeer(publicKey, protomux) { const peerId = keyToId(publicKey) const peer = this.#peers.get(peerId) /* c8 ignore next */ if (!peer) return // TODO: report error - this should not happen - // No-op if no change in state - /* c8 ignore next */ - if (peer.info.status === 'connected') return // TODO: report error - this should not happen - peer.action('connect') - this.#emitPeers() + const wasConnected = peer.info.status === 'connected' + peer.connect(protomux) + if (!wasConnected) this.#emitPeers() } /** @param {Buffer} publicKey */ @@ -343,7 +341,7 @@ export class LocalPeers extends TypedEmitter { /* c8 ignore next */ if (peer.info.status === 'disconnected') return // TODO: Track reasons for closing - peer.action('disconnect') + peer.disconnect() this.#emitPeers() } diff --git a/tests/rpc.js b/tests/rpc.js index 695d8a77a..bb1cc46e4 100644 --- a/tests/rpc.js +++ b/tests/rpc.js @@ -12,6 +12,7 @@ import { Duplex } from 'streamx' import { replicate } from './helpers/rpc.js' import { randomBytes } from 'node:crypto' import NoiseSecretStream from '@hyperswarm/secret-stream' +import Protomux from 'protomux' test('Send invite and accept', async (t) => { t.plan(3) @@ -521,3 +522,12 @@ test('Reconnect peer and send device info', async (t) => { const [r2Peers] = await once(r2, 'peers') t.is(r2Peers[0].name, expectedDeviceInfo.name) }) + +test('connected peer has protomux instance', async (t) => { + const r1 = new LocalPeers() + const r2 = new LocalPeers() + replicate(r1, r2) + const [[peer]] = await once(r1, 'peers') + t.is(peer.status, 'connected') + t.ok(Protomux.isProtomux(peer.protomux)) +}) From ae35e9ca322268af8ee4660ea3b457d43d717787 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 26 Oct 2023 17:47:02 +0900 Subject: [PATCH 8/9] rename and re-organize --- src/invite-api.js | 2 +- src/{rpc/index.js => local-peers.js} | 12 ++++++------ src/mapeo-manager.js | 2 +- src/mapeo-project.js | 2 +- src/member-api.js | 2 +- test-e2e/manager-invite.js | 2 +- test-e2e/members.js | 2 +- test-types/data-types.ts | 2 +- tests/helpers/{rpc.js => local-peers.js} | 4 ++-- tests/invite-api.js | 4 ++-- tests/{rpc.js => local-peers.js} | 4 ++-- 11 files changed, 19 insertions(+), 19 deletions(-) rename src/{rpc/index.js => local-peers.js} (96%) rename tests/helpers/{rpc.js => local-peers.js} (90%) rename tests/{rpc.js => local-peers.js} (99%) diff --git a/src/invite-api.js b/src/invite-api.js index 630d1f42b..03dd72970 100644 --- a/src/invite-api.js +++ b/src/invite-api.js @@ -47,7 +47,7 @@ export class InviteApi extends TypedEmitter { /** * @param {Object} options - * @param {import('./rpc/index.js').LocalPeers} options.rpc + * @param {import('./local-peers.js').LocalPeers} options.rpc * @param {object} options.queries * @param {(projectId: string) => Promise} options.queries.isMember * @param {(invite: import('./generated/rpc.js').Invite) => Promise} options.queries.addProject diff --git a/src/rpc/index.js b/src/local-peers.js similarity index 96% rename from src/rpc/index.js rename to src/local-peers.js index 273fa0ec6..5989a5bd8 100644 --- a/src/rpc/index.js +++ b/src/local-peers.js @@ -1,14 +1,14 @@ // @ts-check import { TypedEmitter } from 'tiny-typed-emitter' import Protomux from 'protomux' -import { openedNoiseSecretStream, keyToId } from '../utils.js' +import { openedNoiseSecretStream, keyToId } from './utils.js' import cenc from 'compact-encoding' import { DeviceInfo, Invite, InviteResponse, InviteResponse_Decision, -} from '../generated/rpc.js' +} from './generated/rpc.js' import pDefer from 'p-defer' const PROTOCOL_NAME = 'mapeo/rpc' @@ -16,7 +16,7 @@ const PROTOCOL_NAME = 'mapeo/rpc' // Protomux message types depend on the order that messages are added to a // channel (this needs to remain consistent). To avoid breaking changes, the // types here should not change. -/** @satisfies {{ [k in keyof typeof import('../generated/rpc.js')]?: number }} */ +/** @satisfies {{ [k in keyof typeof import('./generated/rpc.js')]?: number }} */ const MESSAGE_TYPES = { Invite: 0, InviteResponse: 1, @@ -36,7 +36,7 @@ const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)]) /** @typedef {PeerInfoConnecting | PeerInfoConnected | PeerInfoDisconnected} PeerInfoInternal */ /** @typedef {PeerInfoConnected | PeerInfoDisconnected} PeerInfo */ /** @typedef {PeerInfoInternal['status']} PeerState */ -/** @typedef {import('type-fest').SetNonNullable} InviteWithKeys */ +/** @typedef {import('type-fest').SetNonNullable} InviteWithKeys */ /** * @template ValueType @@ -260,8 +260,8 @@ export class LocalPeers extends TypedEmitter { /** * Connect to a peer over an existing NoiseSecretStream * - * @param {import('../types.js').NoiseStream} stream a NoiseSecretStream from @hyperswarm/secret-stream - * @returns {import('../types.js').ReplicationStream} + * @param {import('./types.js').NoiseStream} stream a NoiseSecretStream from @hyperswarm/secret-stream + * @returns {import('./types.js').ReplicationStream} */ connect(stream) { if (!stream.noiseStream) throw new Error('Invalid stream') diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index b44128b8e..6573c0385 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -23,7 +23,7 @@ import { projectKeyToPublicId, } from './utils.js' import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js' -import { LocalPeers } from './rpc/index.js' +import { LocalPeers } from './local-peers.js' import { InviteApi } from './invite-api.js' import { LocalDiscovery } from './discovery/local-discovery.js' diff --git a/src/mapeo-project.js b/src/mapeo-project.js index 8d6af591a..ed24cbc50 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -67,7 +67,7 @@ export class MapeoProject { * @param {import('drizzle-orm/better-sqlite3').BetterSQLite3Database} opts.sharedDb * @param {IndexWriter} opts.sharedIndexWriter * @param {import('./types.js').CoreStorage} opts.coreStorage Folder to store all hypercore data - * @param {import('./rpc/index.js').LocalPeers} opts.rpc + * @param {import('./local-peers.js').LocalPeers} opts.rpc * */ constructor({ diff --git a/src/member-api.js b/src/member-api.js index 6f92659db..1d2f490d2 100644 --- a/src/member-api.js +++ b/src/member-api.js @@ -22,7 +22,7 @@ export class MemberApi extends TypedEmitter { * @param {import('./core-ownership.js').CoreOwnership} opts.coreOwnership * @param {import('./generated/keys.js').EncryptionKeys} opts.encryptionKeys * @param {Buffer} opts.projectKey - * @param {import('./rpc/index.js').LocalPeers} opts.rpc + * @param {import('./local-peers.js').LocalPeers} opts.rpc * @param {Object} opts.dataTypes * @param {Pick} opts.dataTypes.deviceInfo * @param {Pick} opts.dataTypes.project diff --git a/test-e2e/manager-invite.js b/test-e2e/manager-invite.js index dc19b283f..4a0b866ce 100644 --- a/test-e2e/manager-invite.js +++ b/test-e2e/manager-invite.js @@ -5,7 +5,7 @@ import RAM from 'random-access-memory' import { MEMBER_ROLE_ID } from '../src/capabilities.js' import { InviteResponse_Decision } from '../src/generated/rpc.js' import { MapeoManager, kRPC } from '../src/mapeo-manager.js' -import { replicate } from '../tests/helpers/rpc.js' +import { replicate } from '../tests/helpers/local-peers.js' test('member invite accepted', async (t) => { t.plan(10) diff --git a/test-e2e/members.js b/test-e2e/members.js index 8fe068478..64c28dee9 100644 --- a/test-e2e/members.js +++ b/test-e2e/members.js @@ -11,7 +11,7 @@ import { MEMBER_ROLE_ID, NO_ROLE_CAPABILITIES, } from '../src/capabilities.js' -import { replicate } from '../tests/helpers/rpc.js' +import { replicate } from '../tests/helpers/local-peers.js' test('getting yourself after creating project', async (t) => { const { manager } = setup() diff --git a/test-types/data-types.ts b/test-types/data-types.ts index 74cef62a9..10b2fcb15 100644 --- a/test-types/data-types.ts +++ b/test-types/data-types.ts @@ -14,7 +14,7 @@ import { drizzle } from 'drizzle-orm/better-sqlite3' import RAM from 'random-access-memory' import { IndexWriter } from '../dist/index-writer/index.js' import { projectSettingsTable } from '../dist/schema/client.js' -import { LocalPeers } from '../dist/rpc/index.js' +import { LocalPeers } from '../dist/local-peers.js' import { Expect, type Equal } from './utils.js' type Forks = { forks: string[] } diff --git a/tests/helpers/rpc.js b/tests/helpers/local-peers.js similarity index 90% rename from tests/helpers/rpc.js rename to tests/helpers/local-peers.js index 0b71b0bfa..1b7a8ce84 100644 --- a/tests/helpers/rpc.js +++ b/tests/helpers/local-peers.js @@ -5,8 +5,8 @@ import NoiseSecretStream from '@hyperswarm/secret-stream' */ /** - * @param {import('../../src/rpc/index.js').LocalPeers} rpc1 - * @param {import('../../src/rpc/index.js').LocalPeers} rpc2 + * @param {import('../../src/local-peers.js').LocalPeers} rpc1 + * @param {import('../../src/local-peers.js').LocalPeers} rpc2 * @param { {kp1?: KeyPair, kp2?: KeyPair} } [keyPairs] */ export function replicate( diff --git a/tests/invite-api.js b/tests/invite-api.js index 4ba0909fb..010a955fc 100644 --- a/tests/invite-api.js +++ b/tests/invite-api.js @@ -1,10 +1,10 @@ import test from 'brittle' import { randomBytes } from 'crypto' import { KeyManager } from '@mapeo/crypto' -import { LocalPeers } from '../src/rpc/index.js' +import { LocalPeers } from '../src/local-peers.js' import { InviteApi } from '../src/invite-api.js' import { projectKeyToPublicId } from '../src/utils.js' -import { replicate } from './helpers/rpc.js' +import { replicate } from './helpers/local-peers.js' import NoiseSecretStream from '@hyperswarm/secret-stream' import pDefer from 'p-defer' diff --git a/tests/rpc.js b/tests/local-peers.js similarity index 99% rename from tests/rpc.js rename to tests/local-peers.js index bb1cc46e4..dc704f8a4 100644 --- a/tests/rpc.js +++ b/tests/local-peers.js @@ -5,11 +5,11 @@ import { PeerDisconnectedError, TimeoutError, UnknownPeerError, -} from '../src/rpc/index.js' +} from '../src/local-peers.js' import FakeTimers from '@sinonjs/fake-timers' import { once } from 'events' import { Duplex } from 'streamx' -import { replicate } from './helpers/rpc.js' +import { replicate } from './helpers/local-peers.js' import { randomBytes } from 'node:crypto' import NoiseSecretStream from '@hyperswarm/secret-stream' import Protomux from 'protomux' From be64a3d8ff714d596b72a71379a281a93c03231c Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 26 Oct 2023 20:19:09 +0900 Subject: [PATCH 9/9] revert changes outside scope of PR --- src/mapeo-manager.js | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index 6573c0385..32965a237 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -17,7 +17,6 @@ import { ProjectKeys } from './generated/keys.js' import { deNullify, getDeviceId, - keyToId, projectIdToNonce, projectKeyToId, projectKeyToPublicId, @@ -25,7 +24,6 @@ import { import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js' import { LocalPeers } from './local-peers.js' import { InviteApi } from './invite-api.js' -import { LocalDiscovery } from './discovery/local-discovery.js' /** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */ @@ -52,7 +50,6 @@ export class MapeoManager { #deviceId #rpc #invite - #localDiscovery /** * @param {Object} opts @@ -106,17 +103,6 @@ export class MapeoManager { } else { this.#coreStorage = coreStorage } - - this.#localDiscovery = new LocalDiscovery({ - identityKeypair: this.#keyManager.getIdentityKeypair(), - }) - - this.#localDiscovery.on('connection', (connection) => { - this.#handleDiscoveryConnection(connection).catch((e) => { - // Ignore errors here for now - console.error('Error handling discovery connection', e) - }) - }) } /** @@ -396,18 +382,6 @@ export class MapeoManager { return projectPublicId } - /** - * @param {import('./discovery/local-discovery.js').OpenedNoiseStream} connection - */ - async #handleDiscoveryConnection(connection) { - const peerId = keyToId(connection.remotePublicKey) - this.#rpc.connect(connection) - const { name } = await this.getDeviceInfo() - if (name) { - this.#rpc.sendDeviceInfo(peerId, { name }) - } - } - /** * @template {import('type-fest').Exact} T * @param {T} deviceInfo