Skip to content

Commit

Permalink
Merge branch 'main' into fix-typo-in-secretstream-log-message
Browse files Browse the repository at this point in the history
  • Loading branch information
EvanHahn authored Sep 10, 2024
2 parents 29df1d0 + 839a0ad commit c4b47e3
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 32 deletions.
6 changes: 3 additions & 3 deletions src/discovery/local-discovery.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import StartStopStateMachine from 'start-stop-state-machine'
import pTimeout from 'p-timeout'
import { keyToPublicId } from '@mapeo/crypto'
import { Logger } from '../logger.js'
/** @import { OpenedNoiseStream } from '../utils.js' */
/** @import { OpenedNoiseStream } from '../lib/noise-secret-stream-helpers.js' */

/** @typedef {{ publicKey: Buffer, secretKey: Buffer }} Keypair */
/** @typedef {OpenedNoiseStream<net.Socket>} OpenedNetNoiseStream */
Expand Down Expand Up @@ -113,9 +113,9 @@ export class LocalDiscovery extends TypedEmitter {
socket.off('error', this.#handleSocketError)
socket.on('error', onSocketError)

/** @param {any} e */
/** @param {Error} e */
function onSocketError(e) {
if (e.code === 'EPIPE') {
if ('code' in e && e.code === 'EPIPE') {
socket.destroy()
if (secretStream) {
secretStream.destroy()
Expand Down
37 changes: 37 additions & 0 deletions src/lib/noise-secret-stream-helpers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/** @import { Duplex as NodeDuplex } from 'node:stream' */
/** @import { Duplex as StreamxDuplex } from 'streamx' */
/** @import NoiseSecretStream from '@hyperswarm/secret-stream' */

/**
* @internal
* @typedef {NodeDuplex | StreamxDuplex} RawStream
*/

/**
* @template {RawStream} [T=RawStream]
* @typedef {NoiseSecretStream<T> & { destroyed: true }} DestroyedNoiseStream
*/

/**
* @template {RawStream} [T=RawStream]
* @typedef {NoiseSecretStream<T> & {
* publicKey: Buffer,
* remotePublicKey: Buffer,
* handshake: Buffer,
* destroyed: false
* }} OpenedNoiseStream
*/

/**
* Utility to await a NoiseSecretStream to open, that returns a stream with the
* correct types for publicKey and remotePublicKey (which can be null before
* stream is opened)
*
* @template {RawStream} T
* @param {NoiseSecretStream<T>} stream
* @returns {Promise<OpenedNoiseStream<T> | DestroyedNoiseStream<T>>}
*/
export async function openedNoiseSecretStream(stream) {
await stream.opened
return /** @type {OpenedNoiseStream<T> | DestroyedNoiseStream<T>} */ (stream)
}
7 changes: 4 additions & 3 deletions src/local-peers.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
import pDefer from 'p-defer'
import { Logger } from './logger.js'
import pTimeout, { TimeoutError } from 'p-timeout'
/** @import { OpenedNoiseStream } from './lib/noise-secret-stream-helpers.js' */

// Unique identifier for the mapeo rpc protocol
const PROTOCOL_NAME = 'mapeo/rpc'
Expand Down Expand Up @@ -387,7 +388,7 @@ export class LocalPeers extends TypedEmitter {
}

/**
* @param {Protomux<import('./utils.js').OpenedNoiseStream>} protomux
* @param {Protomux<OpenedNoiseStream>} protomux
* @param {() => void} done
*/
#makePeer(protomux, done) {
Expand Down Expand Up @@ -460,7 +461,7 @@ export class LocalPeers extends TypedEmitter {
}

/**
* @param {Protomux<import('./utils.js').OpenedNoiseStream>} protomux
* @param {Protomux<OpenedNoiseStream>} protomux
*/
#getPeerByProtomux(protomux) {
// We could also index peers by protomux to avoid this, but that would mean
Expand Down Expand Up @@ -511,7 +512,7 @@ export class LocalPeers extends TypedEmitter {

/**
*
* @param {Protomux<import('./utils.js').OpenedNoiseStream>} protomux
* @param {Protomux<OpenedNoiseStream>} protomux
* @param {keyof typeof MESSAGE_TYPES} type
* @param {Buffer} value
*/
Expand Down
3 changes: 2 additions & 1 deletion src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import {
deNullify,
getDeviceId,
keyToId,
openedNoiseSecretStream,
projectIdToNonce,
projectKeyToId,
projectKeyToProjectInviteId,
projectKeyToPublicId,
} from './utils.js'
import { openedNoiseSecretStream } from './lib/noise-secret-stream-helpers.js'
import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js'
import BlobServerPlugin from './fastify-plugins/blobs.js'
import IconServerPlugin from './fastify-plugins/icons.js'
Expand All @@ -51,6 +51,7 @@ import {
/** @import { ProjectSettingsValue as ProjectValue } from '@mapeo/schema' */
/** @import { SetNonNullable } from 'type-fest' */
/** @import { CoreStorage, Namespace } from './types.js' */
/** @import { OpenedNoiseStream } from './lib/noise-secret-stream-helpers.js' */

/** @typedef {SetNonNullable<ProjectKeys, 'encryptionKeys'>} ValidatedProjectKeys */

Expand Down
3 changes: 2 additions & 1 deletion src/sync/peer-sync-controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { unreplicate } from '../lib/hypercore-helpers.js'
/** @import { Role } from '../roles.js' */
/** @import { SyncEnabledState } from './sync-api.js' */
/** @import { Namespace } from '../types.js' */
/** @import { OpenedNoiseStream } from '../lib/noise-secret-stream-helpers.js' */

/**
* @typedef {Role['sync'][Namespace] | 'unknown'} SyncCapability
Expand Down Expand Up @@ -35,7 +36,7 @@ export class PeerSyncController {

/**
* @param {object} opts
* @param {import("protomux")<import('../utils.js').OpenedNoiseStream>} opts.protomux
* @param {import('protomux')<OpenedNoiseStream>} opts.protomux
* @param {import("../core-manager/index.js").CoreManager} opts.coreManager
* @param {import("./sync-state.js").SyncState} opts.syncState
* @param {import('../roles.js').Roles} opts.roles
Expand Down
3 changes: 2 additions & 1 deletion src/sync/sync-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { ExhaustivenessError, assert, keyToId, noop } from '../utils.js'
import { NO_ROLE_ID } from '../roles.js'
/** @import { CoreOwnership as CoreOwnershipDoc } from '@mapeo/schema' */
/** @import { CoreOwnership } from '../core-ownership.js' */
/** @import { OpenedNoiseStream } from '../lib/noise-secret-stream-helpers.js' */

export const kHandleDiscoveryKey = Symbol('handle discovery key')
export const kSyncState = Symbol('sync state')
Expand Down Expand Up @@ -359,7 +360,7 @@ export class SyncApi extends TypedEmitter {
* will then handle validation of role records to ensure that the peer is
* actually still part of the project.
*
* @param {{ protomux: import('protomux')<import('../utils.js').OpenedNoiseStream> }} peer
* @param {{ protomux: import('protomux')<OpenedNoiseStream> }} peer
*/
#handlePeerAdd = (peer) => {
const { protomux } = peer
Expand Down
22 changes: 0 additions & 22 deletions src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ import sodium from 'sodium-universal'
import { keyToPublicId } from '@mapeo/crypto'
import { createHash } from 'node:crypto'
import stableStringify from 'json-stable-stringify'
/** @import { Duplex as NodeDuplex } from 'node:stream' */
/** @import { Duplex as StreamxDuplex } from 'streamx' */
/** @import NoiseStream from '@hyperswarm/secret-stream' */

const PROJECT_INVITE_ID_SALT = Buffer.from('mapeo project invite id', 'ascii')

Expand Down Expand Up @@ -46,25 +43,6 @@ export function parseVersion(version) {
}
}

/** @typedef {NoiseStream & { destroyed: true }} DestroyedNoiseStream */
/**
* @template {NodeDuplex | StreamxDuplex} [T=NodeDuplex | StreamxDuplex]
* @typedef {NoiseStream<T> & { publicKey: Buffer, remotePublicKey: Buffer, handshake: Buffer }} OpenedNoiseStream
*/

/**
* Utility to await a NoiseSecretStream to open, that returns a stream with the
* correct types for publicKey and remotePublicKey (which can be null before
* stream is opened)
*
* @param {NoiseStream} stream
* @returns {Promise<OpenedNoiseStream | DestroyedNoiseStream>}
*/
export async function openedNoiseSecretStream(stream) {
await stream.opened
return /** @type {OpenedNoiseStream | DestroyedNoiseStream} */ (stream)
}

export class ExhaustivenessError extends Error {
/** @param {never} value */
constructor(value) {
Expand Down
2 changes: 1 addition & 1 deletion tests/discovery/local-discovery.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
LocalDiscovery,
} from '../../src/discovery/local-discovery.js'
import NoiseSecretStream from '@hyperswarm/secret-stream'
/** @import { OpenedNoiseStream } from '../../src/utils.js' */
/** @import { OpenedNoiseStream } from '../../src/lib/noise-secret-stream-helpers.js' */

test('peer discovery - discovery and sharing of data', async (t) => {
const deferred = pDefer()
Expand Down

0 comments on commit c4b47e3

Please sign in to comment.