diff --git a/package-lock.json b/package-lock.json index 89ad0c5c7..7ebff9fea 100644 --- a/package-lock.json +++ b/package-lock.json @@ -41,6 +41,7 @@ "mime": "^4.0.1", "multi-core-indexer": "^1.0.0-alpha.9", "p-defer": "^4.0.0", + "p-event": "^6.0.1", "p-timeout": "^6.1.2", "patch-package": "^8.0.0", "protobufjs": "^7.2.3", @@ -80,6 +81,7 @@ "drizzle-kit": "^0.19.12", "eslint": "^8.57.0", "husky": "^8.0.0", + "iterpal": "^0.3.0", "light-my-request": "^5.10.0", "lint-staged": "^14.0.1", "mapeo-offline-map": "^2.0.0", @@ -2205,6 +2207,33 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/cp-file/node_modules/p-event": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/p-event/-/p-event-5.0.1.tgz", + "integrity": "sha512-dd589iCQ7m1L0bmC5NLlVYfy3TbBEsMUfWx9PyAgPeIcFZ/E2yaTZ4Rz4MiBmmJShviiftHVXOqfnfzJ6kyMrQ==", + "dev": true, + "dependencies": { + "p-timeout": "^5.0.2" + }, + "engines": { + "node": "^12.20.0 || ^14.13.1 || >=16.0.0" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/cp-file/node_modules/p-timeout": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-5.1.0.tgz", + "integrity": "sha512-auFDyzzzGZZZdHz3BtET9VEz0SE/uMEAx7uWfGPucfzEwwe/xH0iVeZibQmANYE/hp9T2+UUZT5m+BKyrDp3Ew==", + "dev": true, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/cpy": { "version": "10.1.0", "dev": true, @@ -4409,6 +4438,15 @@ "node": ">=8" } }, + "node_modules/iterpal": { + "version": "0.3.0", + "resolved": "https://registry.npmjs.org/iterpal/-/iterpal-0.3.0.tgz", + "integrity": "sha512-s7fZOi2hrko1W96PVV5dYDZs1KJ3yi18ahSe3xEcBEcc6BaWSPS72eFcDhVrJS/D7ShcaOxPzY3gS/4pw9qK2w==", + "dev": true, + "engines": { + "node": ">=18.0.0" + } + }, "node_modules/jackspeak": { "version": "2.3.6", "resolved": "https://registry.npmjs.org/jackspeak/-/jackspeak-2.3.6.tgz", @@ -5776,25 +5814,14 @@ } }, "node_modules/p-event": { - "version": "5.0.1", - "dev": true, - "license": "MIT", + "version": "6.0.1", + "resolved": "https://registry.npmjs.org/p-event/-/p-event-6.0.1.tgz", + "integrity": "sha512-Q6Bekk5wpzW5qIyUP4gdMEujObYstZl6DMMOSenwBvV0BlE5LkDwkjs5yHbZmdCEq2o4RJx4tE1vwxFVf2FG1w==", "dependencies": { - "p-timeout": "^5.0.2" - }, - "engines": { - "node": "^12.20.0 || ^14.13.1 || >=16.0.0" + "p-timeout": "^6.1.2" }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, - "node_modules/p-event/node_modules/p-timeout": { - "version": "5.1.0", - "dev": true, - "license": "MIT", "engines": { - "node": ">=12" + "node": ">=16.17" }, "funding": { "url": "https://github.com/sponsors/sindresorhus" diff --git a/package.json b/package.json index 3ee65691d..3d636c4fd 100644 --- a/package.json +++ b/package.json @@ -89,6 +89,7 @@ "drizzle-kit": "^0.19.12", "eslint": "^8.57.0", "husky": "^8.0.0", + "iterpal": "^0.3.0", "light-my-request": "^5.10.0", "lint-staged": "^14.0.1", "mapeo-offline-map": "^2.0.0", @@ -139,6 +140,7 @@ "mime": "^4.0.1", "multi-core-indexer": "^1.0.0-alpha.9", "p-defer": "^4.0.0", + "p-event": "^6.0.1", "p-timeout": "^6.1.2", "patch-package": "^8.0.0", "protobufjs": "^7.2.3", diff --git a/src/invite-api.js b/src/invite-api.js index eb3496fe7..c635f876d 100644 --- a/src/invite-api.js +++ b/src/invite-api.js @@ -1,7 +1,8 @@ // @ts-check import { TypedEmitter } from 'tiny-typed-emitter' +import { pEvent } from 'p-event' import { InviteResponse_Decision } from './generated/rpc.js' -import { assert, keyToId, onceSatisfied, noop } from './utils.js' +import { assert, keyToId, noop } from './utils.js' import HashMap from './lib/hashmap.js' import timingSafeEqual from './lib/timing-safe-equal.js' @@ -251,19 +252,18 @@ export class InviteApi extends TypedEmitter { const projectDetailsAbortController = new AbortController() const projectDetailsPromise = - /** @type {typeof onceSatisfied, 'got-project-details'>} */ ( - onceSatisfied - )( - this.rpc, - 'got-project-details', - (projectDetailsPeerId, details) => + /** @type {typeof pEvent<'got-project-details', [string, ProjectJoinDetails]>} */ ( + pEvent + )(this.rpc, 'got-project-details', { + multiArgs: true, + filter: ([projectDetailsPeerId, details]) => // This peer ID check is probably superfluous because the invite ID // should be unguessable, but might be useful if someone forwards an // invite message (or if there's an unforeseen bug). timingSafeEqual(projectDetailsPeerId, peerId) && timingSafeEqual(inviteId, details.inviteId), - { signal: projectDetailsAbortController.signal } - ) + signal: projectDetailsAbortController.signal, + }) .then((args) => args?.[1]) .catch(noop) diff --git a/src/member-api.js b/src/member-api.js index 4767fa8fd..70ed1177f 100644 --- a/src/member-api.js +++ b/src/member-api.js @@ -1,11 +1,11 @@ import * as crypto from 'node:crypto' import { TypedEmitter } from 'tiny-typed-emitter' +import { pEvent } from 'p-event' import { InviteResponse_Decision } from './generated/rpc.js' import { assert, noop, ExhaustivenessError, - onceSatisfied, projectKeyToId, projectKeyToPublicId, } from './utils.js' @@ -180,16 +180,15 @@ export class MemberApi extends TypedEmitter { const timeoutId = setTimeout(() => abortController.abort(), timeout) const responsePromise = - /** @type {typeof onceSatisfied, 'invite-response'>} */ ( - onceSatisfied - )( - this.#rpc, - 'invite-response', - (peerId, inviteResponse) => + /** @type {typeof pEvent<'invite-response', [string, InviteResponse]>} */ ( + pEvent + )(this.#rpc, 'invite-response', { + multiArgs: true, + filter: ([peerId, inviteResponse]) => timingSafeEqual(peerId, deviceId) && timingSafeEqual(invite.inviteId, inviteResponse.inviteId), - { signal: abortController.signal } - ).then((args) => args?.[1]) + signal: abortController.signal, + }).then((args) => args?.[1]) responsePromise.catch(noop) diff --git a/src/utils.js b/src/utils.js index 8dd67a01e..1c41d09a5 100644 --- a/src/utils.js +++ b/src/utils.js @@ -110,111 +110,6 @@ export function setHas(set) { return (value) => set.has(/** @type {*} */ (value)) } -/** - * @internal - * @template {import('tiny-typed-emitter').ListenerSignature} L - * @typedef {import('tiny-typed-emitter').TypedEmitter} TypedEmitter - */ - -/** - * @internal - * @template {TypedEmitter} T - * @typedef {import('./utils_types.d.ts').TypedEvents} TypedEvents - */ - -/** - * @internal - * @template {TypedEmitter} T - * @typedef {import('./utils_types.d.ts').TypedEventsFor} TypedEventsFor - */ - -/** - * @internal - * @template {TypedEmitter} Emitter - * @template {TypedEventsFor} Event - * @typedef {import('./utils_types.d.ts').TypedEventArgs} TypedEventArgs - */ - -/** - * Like `once`, but only resolves after the event's arguments satisfy `check`. - * Useful when you want to listen for a particular instance of an event. - * - * Due to an unfortunate TypeScript quirk, you need to manually specify the - * generic types to use this function with `TypedEmitter` subclasses. For - * example: - * - * ``` - * const res = await onceSatisfied, 'my-event'>( - * myEmitterSubclass, - * 'my-event' - * ) - * ``` - * - * It would be great to remove this verbosity, but it is the only way to achieve - * type safety. Luckily, you'll get a type error if you do it incorrectly. - * - * @template {TypedEmitter} Emitter - * @template {TypedEventsFor} Event - * @param {Emitter} emitter A `tiny-typed-emitter` event emitter. - * @param {Event} eventName The event to listen to. - * @param {(...args: TypedEventArgs) => boolean} check - * Called with the event's arguments. If this function returns `false`, the - * event will be ignored. If this function returns `true`, the promise will - * resolve with these arguments and the event listener will be cleaned up. - * @param {object} [options] - * @param {AbortSignal} [options.signal] An `AbortSignal`. If this signal is - * aborted, the promise will resolve with `null`. - * @returns {Promise>} Resolves with the - * event arguments that satisfy `check`, or `null` if the aborted. - */ -export const onceSatisfied = (emitter, eventName, check, { signal } = {}) => - new Promise((res, rej) => { - /** @type {() => void} */ - let cleanup - if (signal) { - if (signal.aborted) { - rej(signal.reason) - return - } - - const onAbort = () => { - cleanup() - rej(signal.reason) - } - signal.addEventListener('abort', onAbort, { once: true }) - - cleanup = () => { - emitter.off(eventName, onEvent) - signal.removeEventListener('abort', onAbort) - } - } else { - cleanup = () => { - emitter.off(eventName, onEvent) - } - } - - /** @param {TypedEventArgs} args */ - const onEvent = (...args) => { - /** @type {unknown} */ - let err - - try { - if (!check(...args)) return - } catch (e) { - err = e - } - - cleanup() - if (err) { - rej(err) - } else { - res(args) - } - } - - emitter.on(eventName, onEvent) - }) - /** * When reading from SQLite, any optional properties are set to `null`. This * converts `null` back to `undefined` to match the input types (e.g. the types diff --git a/tests/helpers/events.js b/tests/helpers/events.js index f0f8fcbae..b8d63b2e8 100644 --- a/tests/helpers/events.js +++ b/tests/helpers/events.js @@ -1,56 +1,22 @@ // @ts-check -import { assert, onceSatisfied } from '../../src/utils.js' - -/** - * @internal - * @template {import('tiny-typed-emitter').ListenerSignature} L - * @typedef {import('tiny-typed-emitter').TypedEmitter} TypedEmitter - */ - -/** - * @internal - * @template {TypedEmitter} T - * @typedef {import('../../src/utils_types.d.ts').TypedEvents} TypedEvents - */ - -/** - * @internal - * @template {TypedEmitter} T - * @typedef {import('../../src/utils_types.d.ts').TypedEventsFor} TypedEventsFor - */ - -/** - * @internal - * @template {TypedEmitter} Emitter - * @template {TypedEventsFor} Event - * @typedef {import('../../src/utils_types.d.ts').TypedEventArgs} TypedEventArgs - */ +import { assert } from '../../src/utils.js' +import { pEventIterator } from 'p-event' +import { arrayFrom } from 'iterpal' /** * Like `once`, but listens to events up to a certain number of times. * - * @template {TypedEmitter} Emitter - * @template {TypedEventsFor} Event - * @param {Emitter} emitter - * @param {Event} eventName + * @param {import('node:events').EventEmitter} emitter + * @param {string | symbol} eventName * @param {number} count - * @returns {Promise[]>} + * @returns {Promise} */ -export async function onTimes(emitter, eventName, count) { +export function onTimes(emitter, eventName, count) { assert( Number.isSafeInteger(count) && count >= 0, 'onTimes called with an invalid count' ) - /** @type {TypedEventArgs[]} */ - const result = [] - - if (count === 0) return result - - await onceSatisfied(emitter, eventName, (...args) => { - result.push(args) - return result.length === count - }) - - return result + const events = pEventIterator(emitter, eventName, { limit: count }) + return arrayFrom(events) } diff --git a/tests/invite-api.js b/tests/invite-api.js index 88f97e752..b5f354e79 100644 --- a/tests/invite-api.js +++ b/tests/invite-api.js @@ -101,11 +101,7 @@ test('invite-received event has expected payload', async (t) => { }, ] const receivedInvitesArgs = await invitesReceivedPromise - t.alike( - receivedInvitesArgs, - expectedInvites.map((i) => [i]), - 'received expected invites' - ) + t.alike(receivedInvitesArgs, expectedInvites, 'received expected invites') t.alike(inviteApi.getPending(), expectedInvites) }) @@ -492,7 +488,7 @@ test('Receiving invite for project that peer already belongs to', async (t) => { 'got expected responses' ) - const removedInvites = (await invitesRemovedPromise).map((args) => args[0]) + const removedInvites = await invitesRemovedPromise const allButLastRemoved = removedInvites.slice(0, -1) const lastRemoved = removedInvites[removedInvites.length - 1] t.alike( diff --git a/tests/utils.js b/tests/utils.js index 44716537f..e80a19537 100644 --- a/tests/utils.js +++ b/tests/utils.js @@ -1,12 +1,6 @@ // @ts-check import test from 'brittle' -import { - assert, - ExhaustivenessError, - setHas, - onceSatisfied, -} from '../src/utils.js' -import { TypedEmitter } from 'tiny-typed-emitter' +import { assert, ExhaustivenessError, setHas } from '../src/utils.js' test('assert()', (t) => { t.execution(() => assert(true, 'should work')) @@ -33,89 +27,3 @@ test('setHas()', (t) => { t.ok(setHas(set)(1)) t.absent(setHas(set)(9)) }) - -test('onceSatisfied() resolves when the check matches', async (t) => { - /** @type {TypedEmitter<{ e: (a: string, b: number) => void }>} */ - const emitter = new TypedEmitter() - - let checkCallCount = 0 - /** @type {Promise<[string, number]>} */ - const promise = onceSatisfied(emitter, 'e', (a, b) => { - checkCallCount++ - return a.toUpperCase() === 'A' && b > 0 - }) - emitter.emit('e', 'x', 3) - emitter.emit('e', 'y', 2) - emitter.emit('e', 'a', 1) - t.alike(await promise, ['a', 1], 'resolves with the right args') - - emitter.emit('e', 'z', 0) - emitter.emit('e', 'a', 9) - t.is(checkCallCount, 3, 'stops listening after resolving') -}) - -test('onceSatisfied() handles events with no arguments', async (t) => { - /** @type {TypedEmitter<{ e: () => void }>} */ - const emitter = new TypedEmitter() - - let checkCallCount = 0 - /** @type {Promise<[]>} */ - const promise = onceSatisfied(emitter, 'e', (...args) => { - t.is(args.length, 0, 'checker gets no args') - checkCallCount++ - return checkCallCount === 3 - }) - emitter.emit('e') - emitter.emit('e') - emitter.emit('e') - t.alike(await promise, [], 'resolves with no args') -}) - -test('onceSatisfied() abort', async (t) => { - /** @type {TypedEmitter<{ e: () => void }>} */ - const emitter = new TypedEmitter() - - const abortController = new AbortController() - const promise = onceSatisfied( - emitter, - 'e', - () => { - t.fail('should never be called') - return false - }, - { signal: abortController.signal } - ) - abortController.abort() - await t.exception(promise, 'rejects when aborted') -}) - -test('onceSatisfied() abort before starting', async (t) => { - /** @type {TypedEmitter<{ e: () => void }>} */ - const emitter = new TypedEmitter() - - const abortedSignal = AbortSignal.abort() - await t.exception( - () => - onceSatisfied( - emitter, - 'e', - () => { - t.fail('should never be called') - return false - }, - { signal: abortedSignal } - ), - 'rejects immediately' - ) -}) - -test('onceSatisfied() rejects if check function rejects', async (t) => { - /** @type {TypedEmitter<{ e: () => void }>} */ - const emitter = new TypedEmitter() - - const promise = onceSatisfied(emitter, 'e', () => { - throw new Error('check fails') - }) - emitter.emit('e') - await t.exception(promise, 'rejects when check throws') -})