diff --git a/package-lock.json b/package-lock.json index c7250fd00..4ba5e653d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -24,7 +24,6 @@ "better-sqlite3": "^8.7.0", "big-sparse-array": "^1.0.3", "bogon": "^1.1.0", - "bonjour-service": "^1.2.1", "compact-encoding": "^2.12.0", "corestore": "^6.8.4", "debug": "^4.3.4", @@ -770,11 +769,6 @@ "version": "1.10.1", "license": "Apache-2.0" }, - "node_modules/@leichtgewicht/ip-codec": { - "version": "2.0.4", - "resolved": "https://registry.npmjs.org/@leichtgewicht/ip-codec/-/ip-codec-2.0.4.tgz", - "integrity": "sha512-Hcv+nVC0kZnQ3tD9GVu5xSMR4VVYOteQIr/hwFPVEvPdlXqgGEuRjiheChHgdM+JyqdgNcmzZOX/tnl0JOiI7A==" - }, "node_modules/@lukeed/ms": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/@lukeed/ms/-/ms-2.0.2.tgz", @@ -1711,15 +1705,6 @@ "compact-encoding-net": "^1.2.0" } }, - "node_modules/bonjour-service": { - "version": "1.2.1", - "resolved": "https://registry.npmjs.org/bonjour-service/-/bonjour-service-1.2.1.tgz", - "integrity": "sha512-oSzCS2zV14bh2kji6vNe7vrpJYCHGvcZnlffFQ1MEoX/WOeQ/teD8SYWKR942OI3INjq8OMNJlbPK5LLLUxFDw==", - "dependencies": { - "fast-deep-equal": "^3.1.3", - "multicast-dns": "^7.2.5" - } - }, "node_modules/brace-expansion": { "version": "1.1.11", "license": "MIT", @@ -2514,17 +2499,6 @@ "node": ">=8" } }, - "node_modules/dns-packet": { - "version": "5.6.1", - "resolved": "https://registry.npmjs.org/dns-packet/-/dns-packet-5.6.1.tgz", - "integrity": "sha512-l4gcSouhcgIKRvyy99RNVOgxXiicE+2jZoNmaNmZ6JXiGajBOJAesk1OBlJuM5k2c+eudGdLxDqXuPCKIj6kpw==", - "dependencies": { - "@leichtgewicht/ip-codec": "^2.0.1" - }, - "engines": { - "node": ">=6" - } - }, "node_modules/doctrine": { "version": "3.0.0", "dev": true, @@ -5308,18 +5282,6 @@ "tiny-typed-emitter": "^2.1.0" } }, - "node_modules/multicast-dns": { - "version": "7.2.5", - "resolved": "https://registry.npmjs.org/multicast-dns/-/multicast-dns-7.2.5.tgz", - "integrity": "sha512-2eznPJP8z2BFLX50tf0LuODrpINqP1RVIm/CObbTcBRITQgmC/TjcREF1NeTBzIcR5XO/ukWo+YHOjBbFwIupg==", - "dependencies": { - "dns-packet": "^5.2.2", - "thunky": "^1.0.2" - }, - "bin": { - "multicast-dns": "cli.js" - } - }, "node_modules/mutexify": { "version": "1.4.0", "license": "MIT", @@ -7720,11 +7682,6 @@ "node": ">=12.22" } }, - "node_modules/thunky": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/thunky/-/thunky-1.1.0.tgz", - "integrity": "sha512-eHY7nBftgThBqOyHGVN+l8gF0BucP09fMo0oO/Lb0w1OF80dJv+lDVpXG60WMQvkcxAkNybKsrEIE3ZtKGmPrA==" - }, "node_modules/time-ordered-set": { "version": "1.0.2", "license": "MIT" diff --git a/package.json b/package.json index aea06b489..237c12ebf 100644 --- a/package.json +++ b/package.json @@ -132,7 +132,6 @@ "better-sqlite3": "^8.7.0", "big-sparse-array": "^1.0.3", "bogon": "^1.1.0", - "bonjour-service": "^1.2.1", "compact-encoding": "^2.12.0", "corestore": "^6.8.4", "debug": "^4.3.4", diff --git a/patches/bonjour-service+1.2.1.patch b/patches/bonjour-service+1.2.1.patch deleted file mode 100644 index 2d7053d59..000000000 --- a/patches/bonjour-service+1.2.1.patch +++ /dev/null @@ -1,77 +0,0 @@ -diff --git a/node_modules/bonjour-service/dist/lib/registry.js b/node_modules/bonjour-service/dist/lib/registry.js -index 5462ca2..662486d 100644 ---- a/node_modules/bonjour-service/dist/lib/registry.js -+++ b/node_modules/bonjour-service/dist/lib/registry.js -@@ -15,14 +15,18 @@ class Registry { - this.server = server; - } - publish(config) { -- function start(service, registry, opts) { -+ const configProbe = config.probe !== false; -+ -+ const service = new service_1.default(config, start.bind(null, this), stop.bind(null, this)); -+ -+ function start(registry, { probe = configProbe } = {}) { - if (service.activated) - return; - service.activated = true; - registry.services.push(service); - if (!(service instanceof service_1.default)) - return; -- if (opts === null || opts === void 0 ? void 0 : opts.probe) { -+ if (probe) { - registry.probe(registry.server.mdns, service, (exists) => { - if (exists) { - if (service.stop !== undefined) -@@ -37,7 +41,7 @@ class Registry { - registry.announce(registry.server, service); - } - } -- function stop(service, registry, callback) { -+ function stop(registry, callback) { - if (!callback) - callback = noop; - if (!service.activated) -@@ -49,10 +53,7 @@ class Registry { - if (index !== -1) - registry.services.splice(index, 1); - } -- const service = new service_1.default(config); -- service.start = start.bind(null, service, this); -- service.stop = stop.bind(null, service, this); -- service.start({ probe: config.probe !== false }); -+ service.start(); - return service; - } - unpublishAll(callback) { -diff --git a/node_modules/bonjour-service/dist/lib/service.d.ts b/node_modules/bonjour-service/dist/lib/service.d.ts -index 2c711f9..c459a07 100644 ---- a/node_modules/bonjour-service/dist/lib/service.d.ts -+++ b/node_modules/bonjour-service/dist/lib/service.d.ts -@@ -41,8 +41,8 @@ export declare class Service extends EventEmitter { - published: boolean; - activated: boolean; - destroyed: boolean; -- start?: CallableFunction; -- stop?: CallableFunction; -+ start: CallableFunction; -+ stop: CallableFunction; - private txtService; - constructor(config: ServiceConfig); - records(): Array; -diff --git a/node_modules/bonjour-service/dist/lib/service.js b/node_modules/bonjour-service/dist/lib/service.js -index 41da95a..ac71665 100644 ---- a/node_modules/bonjour-service/dist/lib/service.js -+++ b/node_modules/bonjour-service/dist/lib/service.js -@@ -10,8 +10,10 @@ const events_1 = require("events"); - const service_types_1 = require("./service-types"); - const TLD = '.local'; - class Service extends events_1.EventEmitter { -- constructor(config) { -+ constructor(config, start, stop) { - super(); -+ this.start = start; -+ this.stop = stop; - this.probe = true; - this.published = false; - this.activated = false; diff --git a/src/discovery/dns-sd.js b/src/discovery/dns-sd.js deleted file mode 100644 index 85d38b09c..000000000 --- a/src/discovery/dns-sd.js +++ /dev/null @@ -1,231 +0,0 @@ -import { TypedEmitter } from 'tiny-typed-emitter' -import { Bonjour } from 'bonjour-service' -import pTimeout from 'p-timeout' -import { isIPv4 } from 'node:net' -import { randomBytes } from 'node:crypto' -import { once } from 'node:events' -import { Logger } from '../logger.js' - -const SERVICE_NAME = 'mapeo' - -/** - * @typedef {object} MapeoService - * @property {string} address IPv4 address of service - * @property {number} port - * @property {string} name Instance name - */ - -/** - * @typedef {object} DnsSdEvents - * @property {(service: MapeoService) => void} up - * @property {(service: MapeoService) => void} down - */ - -/** - * @extends {TypedEmitter} - */ -export class DnsSd extends TypedEmitter { - #name - /** @type {import('bonjour-service').Bonjour | null} */ - #bonjour = null - /** @type {import('bonjour-service').Service | null} */ - #service = null - /** @type {import('bonjour-service').Browser | null} */ - #browser = null - /** @type {null | Error} */ - #error = null - /** @param {import('bonjour-service').Service} service */ - #handleServiceUp = (service) => { - if (service.name === this.#name) { - this.#log(`Ignoring service up from self`) - return - } - const address = service.addresses?.find(isIPv4) - /* c8 ignore start */ - if (!address) { - this.#log(`service up (${service.name}) with no ipv4 addresses`) - return - } - /* c8 ignore stop */ - const { name, port } = service - this.#log('serviceUp', name.slice(0, 7), address, port) - this.emit('up', { port, name, address }) - } - /** @param {import('bonjour-service').Service} service */ - #handleServiceDown = (service) => { - if (service.name === this.#name) { - this.#log(`Ignoring service down from self`) - return - } - const address = service.addresses?.find(isIPv4) - /* c8 ignore start */ - if (!address) { - this.#log(`service down (${service.name}) with no ipv4 addresses`) - return - } - /* c8 ignore stop */ - const { name, port } = service - this.#log(`service down`, [name, address, port]) - this.emit('down', { port, name, address }) - } - #disableIpv6 - /** @type {Promise | null} */ - #advertisingStarting = null - /** @type {Promise | null} */ - #advertisingStopping = null - #log - #l - - /** - * - * @param {object} [opts] - * @param {string} [opts.name] - * @param {boolean} [opts.disableIpv6] - * @param {Logger} [opts.logger] - */ - constructor({ name, disableIpv6 = true, logger } = {}) { - super() - this.#l = Logger.create('dnssd', logger) - this.#name = name || randomBytes(8).toString('hex') - this.#disableIpv6 = disableIpv6 - this.#log = this.#l.log.bind(this.#l) - } - - get name() { - return this.#name - } - - /** @param {number} port */ - async advertise(port) { - if (this.#advertisingStopping) { - await this.#advertisingStopping - } - this.#log(`Starting advertising on ${port}`) - if (this.#service && this.#service.port === port) { - if (this.#service.published) { - this.#log(`Already advertising on ${port}`) - return - } - this.#log(`service stopped, starting`) - this.#service.start() - } else { - if (this.#service) { - this.#log(`Stopping previous advertisement on ${this.#service.port}`) - await this.stopAdvertising() - } - const instance = this.#getInstance() - this.#service = instance.publish({ - name: this.#name, - host: this.#name + '.local', - port, - type: SERVICE_NAME, - disableIPv6: this.#disableIpv6, - }) - } - this.#advertisingStarting = once(this.#service, 'up') - await pTimeout(this.#advertisingStarting, { milliseconds: 5000 }) - this.#advertisingStarting = null - this.#log(`Now advertising on ${port}`) - } - - browse() { - if (this.#browser) { - // @ts-ignore - using private property as a check for whether browser is already started - if (this.#browser.onresponse) { - this.#log(`browser already started, updating`) - this.#browser.update() - return - } - this.#browser.start() - } else { - const instance = this.#getInstance() - this.#browser = instance.find({ - type: SERVICE_NAME, - }) - } - this.#browser.on('up', this.#handleServiceUp) - this.#browser.on('down', this.#handleServiceDown) - } - - async stopAdvertising() { - this.#log(`stopping advertising`) - if (this.#advertisingStarting) { - await this.#advertisingStarting - } - const service = this.#service - if (!service) return - this.#advertisingStopping = /** @type {Promise} */ ( - new Promise((res) => { - service.stop(res) - }) - ) - await this.#advertisingStopping - this.#advertisingStopping = null - this.#log(`stopped advertising`) - } - - stopBrowsing() { - if (!this.#browser) return - this.#browser.removeListener('up', this.#handleServiceUp) - this.#browser.removeListener('down', this.#handleServiceDown) - this.#browser.stop() - } - - async destroy() { - this.#log(`destroying`) - const bonjour = this.#bonjour - if (!bonjour) return - this.#bonjour = null - this.stopBrowsing() - this.#browser = null - const service = this.#service - this.#service = null - await /** @type {Promise} */ ( - new Promise((res) => { - bonjour.unpublishAll(res) - }) - ) - this.#log(`all services unpublished`) - await /** @type {Promise} */ ( - new Promise((res) => { - if (!service) return res() - service.stop(res) - }) - ) - this.#log(`stopped advertising`) - await new Promise((res) => { - bonjour.destroy(res) - }) - this.#log(`destroyed`) - } - - /** - * Lazily get a Bonjour instance. Previous errors are cleared - * - * @returns {Bonjour} - */ - #getInstance() { - if (this.#bonjour) { - // tests don't replicate an error here yet - /* c8 ignore start */ - if (this.#error) { - // Don't allow advertise / browse when the instance has errored - throw this.#error - } - /* c8 ignore stop */ - return this.#bonjour - } - this.#error = null - this.#bonjour = new Bonjour( - undefined, - // Tests don't replicate error here yet - /* c8 ignore start */ - (/** @type {any} */ error) => { - // TODO: Logging - this.#error = error - } - /* c8 ignore stop */ - ) - return this.#bonjour - } -} diff --git a/src/discovery/local-discovery.js b/src/discovery/local-discovery.js index 33ea5203b..d498788b0 100644 --- a/src/discovery/local-discovery.js +++ b/src/discovery/local-discovery.js @@ -1,9 +1,9 @@ import { TypedEmitter } from 'tiny-typed-emitter' import net from 'node:net' +import { randomBytes } from 'node:crypto' import NoiseSecretStream from '@hyperswarm/secret-stream' import { once } from 'node:events' import { noop } from '../utils.js' -import { DnsSd } from './dns-sd.js' import { isPrivate } from 'bogon' import StartStopStateMachine from 'start-stop-state-machine' import pTimeout from 'p-timeout' @@ -25,10 +25,10 @@ export const ERR_DUPLICATE = 'Duplicate connection' */ export class LocalDiscovery extends TypedEmitter { #identityKeypair + #name = randomBytes(8).toString('hex') #server /** @type {Map} */ #noiseConnections = new Map() - #dnssd #sm #log /** @type {(e: Error) => void} */ @@ -38,20 +38,12 @@ export class LocalDiscovery extends TypedEmitter { /** * @param {Object} opts * @param {Keypair} opts.identityKeypair - * @param {DnsSd} [opts.dnssd] Optional DnsSd instance, used for testing * @param {Logger} [opts.logger] */ - constructor({ identityKeypair, dnssd, logger }) { + constructor({ identityKeypair, logger }) { super() - this.#l = Logger.create('mdns', logger) + this.#l = Logger.create('LocalDiscovery', logger) this.#log = this.#l.log.bind(this.#l) - this.#dnssd = - dnssd || - new DnsSd({ - name: keyToPublicId(identityKeypair.publicKey), - logger: this.#l, - }) - this.#dnssd.on('up', this.#handleServiceUp.bind(this)) this.#sm = new StartStopStateMachine({ start: this.#start.bind(this), stop: this.#stop.bind(this), @@ -74,30 +66,30 @@ export class LocalDiscovery extends TypedEmitter { return this.#server.address() } + /** @returns {Promise<{ name: string, port: number }>} */ async start() { - return this.#sm.start() + await this.#sm.start() + return { name: this.#name, port: getAddress(this.#server).port } } async #start() { - // start browsing straight away - this.#dnssd.browse() - // Let OS choose port, listen on ip4, all interfaces this.#server.listen(0, '0.0.0.0') await once(this.#server, 'listening') const addr = getAddress(this.#server) this.#log('server listening on port ' + addr.port) - await this.#dnssd.advertise(addr.port) - this.#log('advertising service on port ' + addr.port) } /** - * - * @param {import('./dns-sd.js').MapeoService} service - * @returns + * @param {object} peer + * @param {string} peer.address + * @param {number} peer.port + * @param {string} peer.name + * @returns {void} */ - #handleServiceUp({ address, port, name }) { - this.#log('serviceUp', name.slice(0, 7), address, port) + connectPeer({ address, port, name }) { + if (this.#name === name) return + this.#log('peer connected', name.slice(0, 7), address, port) if (this.#noiseConnections.has(name)) { this.#log(`Already connected to ${name.slice(0, 7)}`) return @@ -271,7 +263,6 @@ export class LocalDiscovery extends TypedEmitter { this.#log('stopping') const { port } = getAddress(this.#server) this.#server.close() - const destroyPromise = this.#dnssd.destroy() const closePromise = once(this.#server, 'close') await pTimeout(closePromise, { milliseconds: force ? (timeout === 0 ? 1 : timeout) : Infinity, @@ -282,7 +273,6 @@ export class LocalDiscovery extends TypedEmitter { return pTimeout(closePromise, { milliseconds: 500 }) }, }) - await destroyPromise this.#log(`stopped for ${port}`) } } diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index 554c1df9b..e299fa505 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -731,15 +731,21 @@ export class MapeoManager extends TypedEmitter { return this.#invite } - async startLocalPeerDiscovery() { + /** @returns {Promise<{ name: string, port: number }>} */ + startLocalPeerDiscoveryServer() { return this.#localDiscovery.start() } /** @type {LocalDiscovery['stop']} */ - async stopLocalPeerDiscovery(opts) { + stopLocalPeerDiscoveryServer(opts) { return this.#localDiscovery.stop(opts) } + /** @type {LocalDiscovery['connectPeer']} */ + connectPeer(peer) { + this.#localDiscovery.connectPeer(peer) + } + /** * @returns {Promise} */ diff --git a/test-e2e/utils.js b/test-e2e/utils.js index 3fd559836..0e517bb54 100644 --- a/test-e2e/utils.js +++ b/test-e2e/utils.js @@ -28,7 +28,7 @@ const clientMigrationsFolder = new URL('../drizzle/client', import.meta.url) export async function disconnectPeers(managers) { await Promise.all( managers.map(async (manager) => { - return manager.stopLocalPeerDiscovery({ force: true }) + return manager.stopLocalPeerDiscoveryServer({ force: true }) }) ) } @@ -39,7 +39,12 @@ export async function disconnectPeers(managers) { export function connectPeers(managers, { discovery = true } = {}) { if (discovery) { for (const manager of managers) { - manager.startLocalPeerDiscovery() + manager.startLocalPeerDiscoveryServer().then(({ name, port }) => { + for (const otherManager of managers) { + if (otherManager === manager) continue + otherManager.connectPeer({ address: '127.0.0.1', name, port }) + } + }) } return function destroy() { return disconnectPeers(managers) diff --git a/tests/discovery/dns-sd.js b/tests/discovery/dns-sd.js deleted file mode 100644 index ce9f10c21..000000000 --- a/tests/discovery/dns-sd.js +++ /dev/null @@ -1,202 +0,0 @@ -import { DnsSd } from '../../src/discovery/dns-sd.js' -import test from 'brittle' -import { setTimeout as delay } from 'node:timers/promises' - -// Time in ms to wait for mdns messages to propogate -const MDNS_WAIT_TIME = 1000 - -test('dns-sd multiple clients, 2000ms staggered start/stop', async (t) => { - await testMultiple(t, { period: 2000 }) -}) - -test('dns-sd multiple clients, immediate start/stop', async (t) => { - await testMultiple(t, { period: 0 }) -}) - -test('Calling advertise() multiple times with same port is a noop', async (t) => { - const dnssd1 = new DnsSd() - const dnssd2 = new DnsSd() - const ups = [] - dnssd1.on('up', (service) => ups.push(service.name)) - dnssd1.browse() - await delay(500) - await dnssd2.advertise(5001) - await delay(500) - await dnssd2.advertise(5001) - await delay(500) - t.alike(ups, [dnssd2.name]) - await Promise.all([dnssd1.destroy(), dnssd2.destroy()]) -}) - -test('Calling browse() multiple times is a noop', async (t) => { - const dnssd1 = new DnsSd() - const dnssd2 = new DnsSd() - const ups = [] - dnssd1.on('up', (service) => ups.push(service.name)) - dnssd1.browse() - await dnssd2.advertise(5001) - await delay(500) - dnssd1.browse(5001) - await delay(500) - t.alike(ups, [dnssd2.name]) - await Promise.all([dnssd1.destroy(), dnssd2.destroy()]) -}) - -test('Calling advertise() multiple times with a different port republishes the service', async (t) => { - const dnssd1 = new DnsSd() - const dnssd2 = new DnsSd() - const ups = [] - const downs = [] - dnssd1.on('up', (service) => ups.push(service.port)) - dnssd1.on('down', (service) => downs.push(service.port)) - dnssd1.browse() - await delay(500) - await dnssd2.advertise(5001) - await delay(500) - await dnssd2.advertise(5002) - await delay(500) - t.alike(ups, [5001, 5002]) - t.alike(downs, [5001]) - await Promise.all([dnssd1.destroy(), dnssd2.destroy()]) -}) - -test('Can stop and start advertising and browsing (change advertise port)', async (t) => { - const dnssd1 = new DnsSd() - const dnssd2 = new DnsSd() - const ups = [] - dnssd1.on('up', (service) => ups.push(service.port)) - dnssd1.browse() - await dnssd2.advertise(5001) - await delay(500) - await dnssd2.stopAdvertising() - dnssd1.stopBrowsing() - dnssd1.browse() - await dnssd2.advertise(5002) - await delay(500) - t.alike(ups, [5001, 5002]) - await Promise.all([dnssd1.destroy(), dnssd2.destroy()]) -}) - -test('Can stop and start advertising on same port', async (t) => { - const dnssd1 = new DnsSd() - const dnssd2 = new DnsSd() - const ups = [] - const downs = [] - dnssd1.on('up', (service) => ups.push(service.port)) - dnssd1.on('down', (service) => downs.push(service.port)) - dnssd1.browse() - await dnssd2.advertise(5001) - await delay(500) - await dnssd2.stopAdvertising() - dnssd1.stopBrowsing() - dnssd1.browse() - await dnssd2.advertise(5001) - await delay(500) - await dnssd2.stopAdvertising() - await delay(500) - t.alike(ups, [5001, 5001]) - t.alike(downs, [5001, 5001]) - await Promise.all([dnssd1.destroy(), dnssd2.destroy()]) -}) - -test('After destroy, can advertise and browse', async (t) => { - const dnssd1 = new DnsSd() - const dnssd2 = new DnsSd() - const ups = [] - dnssd1.on('up', (service) => ups.push(service.port)) - dnssd1.browse() - await dnssd2.advertise(5001) - await delay(500) - await Promise.all([dnssd1.destroy(), dnssd2.destroy()]) - dnssd1.browse() - await dnssd2.advertise(5002) - await delay(500) - t.alike(ups, [5001, 5002]) - await Promise.all([dnssd1.destroy(), dnssd2.destroy()]) -}) - -test('can call stopAdvertising immediately after advertise()', async (t) => { - t.plan(1) - const dnssd = new DnsSd() - const startAdvertising = dnssd.advertise(5001) - await dnssd.stopAdvertising() - await startAdvertising - await dnssd.destroy() - t.pass(`Did not timeout`) -}) - -test('can call advertise() immediately after stopAdvertise()', async (t) => { - const dnssd1 = new DnsSd() - const dnssd2 = new DnsSd() - const ups = [] - dnssd1.on('up', (service) => ups.push(service.port)) - dnssd1.browse() - await dnssd2.advertise(5001) - await delay(500) - const stopAdvertising = dnssd2.stopAdvertising() - await dnssd2.advertise(5001) - await stopAdvertising - await delay(500) - t.alike(ups, [5001, 5001]) - await Promise.all([dnssd1.destroy(), dnssd2.destroy()]) -}) - -test('calling destroy() before anything else is a noop', async (t) => { - t.plan(1) - const dnssd1 = new DnsSd() - await dnssd1.destroy() - t.pass() -}) - -/** - * @param {any} t - * @param {object} opts - * @param {number} opts.period Delay for starting and stopping services - start and stop will happen at a random time within this period - * @param {number} [opts.count] Number of instances to create and browse and advertise (default 20) - */ -async function testMultiple(t, { period, count = 20 }) { - t.plan(count * 2 + 1) - const instances = new Map() - const serviceUps = new Map() - const serviceDowns = new Map() - for (let i = 0; i < count; i++) { - const dnsSd = new DnsSd() - instances.set(dnsSd.name, dnsSd) - const ups = [] - const downs = [] - serviceUps.set(dnsSd.name, ups) - serviceDowns.set(dnsSd.name, downs) - dnsSd.on('up', (service) => ups.push(service.name)) - dnsSd.on('down', (service) => downs.push(service.name)) - // Start advertising and browsing at a random time within the first `delay` - // milliseconds, then wait for MDNS_WAIT_TIME before then stopping browsing - // at a random time within `delay` - setTimeout(() => dnsSd.advertise(5000 + i), period * Math.random()) - setTimeout(() => dnsSd.browse(), period * Math.random()) - setTimeout( - () => dnsSd.stopAdvertising(), - MDNS_WAIT_TIME + period + period * Math.random() - ) - } - await delay(2 * (period + MDNS_WAIT_TIME)) - const instanceNames = [...instances.keys()] - for (const name of instanceNames) { - const expected = instanceNames.filter((n) => n !== name).sort() - t.alike( - serviceUps.get(name).sort(), - expected, - `${name} received 'up' from all ${expected.length} other instances` - ) - t.alike( - serviceDowns.get(name).sort(), - expected, - `${name} received 'down' from all ${expected.length} other instances` - ) - } - const destroyPromises = [] - for (const instance of instances.values()) { - destroyPromises.push(instance.destroy()) - } - await Promise.all(destroyPromises) - t.pass('All instances destroyed') -} diff --git a/tests/discovery/local-discovery.js b/tests/discovery/local-discovery.js index 529b9fe25..31d1fe3d0 100644 --- a/tests/discovery/local-discovery.js +++ b/tests/discovery/local-discovery.js @@ -1,6 +1,7 @@ import test from 'brittle' import { randomBytes } from 'node:crypto' import net from 'node:net' +import { every } from 'iterpal' import { KeyManager } from '@mapeo/crypto' import { setTimeout as delay } from 'node:timers/promises' import pDefer from 'p-defer' @@ -11,40 +12,45 @@ import { } from '../../src/discovery/local-discovery.js' import NoiseSecretStream from '@hyperswarm/secret-stream' -test('mdns - discovery and sharing of data', (t) => { +test('peer discovery - discovery and sharing of data', async (t) => { const deferred = pDefer() const identityKeypair1 = new KeyManager(randomBytes(16)).getIdentityKeypair() const identityKeypair2 = new KeyManager(randomBytes(16)).getIdentityKeypair() - const mdnsDiscovery1 = new LocalDiscovery({ + const localDiscovery1 = new LocalDiscovery({ identityKeypair: identityKeypair1, }) - const mdnsDiscovery2 = new LocalDiscovery({ + const localDiscovery2 = new LocalDiscovery({ identityKeypair: identityKeypair2, }) const str = 'hi' - mdnsDiscovery1.on('connection', (stream) => { + localDiscovery1.on('connection', (stream) => { stream.on('error', handleConnectionError.bind(null, t)) stream.write(str) }) - mdnsDiscovery2.on('connection', (stream) => { + localDiscovery2.on('connection', (stream) => { stream.on('error', handleConnectionError.bind(null, t)) stream.on('data', (d) => { t.is(d.toString(), str, 'expected data written') - Promise.all([ - mdnsDiscovery1.stop({ force: true }), - mdnsDiscovery2.stop({ force: true }), - ]).then(() => { - t.pass('teardown complete') - deferred.resolve() - }) + deferred.resolve() }) }) - mdnsDiscovery1.start() - mdnsDiscovery2.start() + t.teardown(() => + Promise.all([ + localDiscovery1.stop({ force: true }), + localDiscovery2.stop({ force: true }), + ]) + ) + const [server1, server2] = await Promise.all([ + localDiscovery1.start(), + localDiscovery2.start(), + ]) + + localDiscovery1.connectPeer({ address: '127.0.0.1', ...server2 }) + localDiscovery2.connectPeer({ address: '127.0.0.1', ...server1 }) return deferred.promise }) @@ -83,11 +89,11 @@ test('deduplicate incoming connections', async (t) => { await discovery.stop({ force: true }) }) -test(`mdns - discovery of 30 peers with random time instantiation`, async (t) => { +test(`peer discovery of 30 peers with random connection times`, async (t) => { await testMultiple(t, { period: 2000, nPeers: 30 }) }) -test(`mdns - discovery of 30 peers instantiated at the same time`, async (t) => { +test(`peer discovery of 30 peers connected at the same time`, async (t) => { await testMultiple(t, { period: 0, nPeers: 30 }) }) @@ -111,10 +117,21 @@ async function noiseConnect({ port, address }, keyPair) { async function testMultiple(t, { period, nPeers = 20 }) { const peersById = new Map() const connsById = new Map() - const promises = [] // t.plan(3 * nPeers + 1) - async function spawnPeer(onConnected) { + const { promise: fullyConnectedPromise, resolve: onFullyConnected } = pDefer() + + const onConnection = () => { + const isFullyConnected = every( + connsById.values(), + (conns) => conns.length >= nPeers - 1 + ) + if (isFullyConnected) onFullyConnected() + } + + /** @type {LocalDiscovery[]} */ + const peers = [] + for (let i = 0; i < nPeers; i++) { const identityKeypair = new KeyManager(randomBytes(16)).getIdentityKeypair() const discovery = new LocalDiscovery({ identityKeypair }) const peerId = keyToPublicId(discovery.publicKey) @@ -124,20 +141,30 @@ async function testMultiple(t, { period, nPeers = 20 }) { discovery.on('connection', (conn) => { conn.on('error', handleConnectionError.bind(null, t)) conns.push(conn) - if (conns.length >= nPeers - 1) onConnected() + onConnection() }) - await discovery.start() - return discovery + peers.push(discovery) } - for (let p = 0; p < nPeers; p++) { - const deferred = pDefer() - promises.push(deferred.promise) - setTimeout(spawnPeer, Math.floor(Math.random() * period), deferred.resolve) + const servers = await Promise.all( + peers.map(async (peer) => { + const result = await peer.start() + t.teardown(() => peer.stop({ force: true })) + return result + }) + ) + + for (const [peerIndex, peer] of peers.entries()) { + for (const [serverIndex, server] of servers.entries()) { + if (peerIndex === serverIndex) continue + delay(Math.floor(Math.random() * period)).then(() => { + peer.connectPeer({ address: '127.0.0.1', ...server }) + }) + } } // Wait for all peers to connect to at least nPeers - 1 peers (every other peer) - await Promise.all(promises) + await fullyConnectedPromise // Wait another 1000ms for any deduplication await delay(1000) @@ -158,13 +185,6 @@ async function testMultiple(t, { period, nPeers = 20 }) { } other peers` ) } - - const stopPromises = [] - for (const discovery of peersById.values()) { - stopPromises.push(discovery.stop({ force: true })) - } - await Promise.all(stopPromises) - t.pass('teardown complete') } function handleConnectionError(t, e) {