Skip to content

Commit

Permalink
Merge branch 'main' into feat/sync-controller
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaclennan authored Oct 26, 2023
2 parents edeb94c + 2ff2a99 commit 50ed1de
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 49 deletions.
47 changes: 29 additions & 18 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@
"streamx": "^2.15.1",
"tempy": "^3.1.0",
"ts-proto": "^1.156.7",
"type-fest": "^3.10.0",
"typedoc": "^0.24.8",
"typedoc-plugin-markdown": "^3.15.3",
"typescript": "^5.1.6"
Expand Down Expand Up @@ -145,6 +144,7 @@
"sub-encoder": "^2.1.1",
"throttle-debounce": "^5.0.0",
"tiny-typed-emitter": "^2.1.0",
"type-fest": "^4.5.0",
"varint": "^6.0.0",
"z32": "^1.0.1"
}
Expand Down
4 changes: 2 additions & 2 deletions src/discovery/mdns.js → src/discovery/local-discovery.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export const ERR_DUPLICATE = 'Duplicate connection'
/**
* @extends {TypedEmitter<DiscoveryEvents>}
*/
export class MdnsDiscovery extends TypedEmitter {
export class LocalDiscovery extends TypedEmitter {
#identityKeypair
#server
/** @type {Map<string, NoiseSecretStream<net.Socket>>} */
Expand Down Expand Up @@ -254,7 +254,7 @@ export class MdnsDiscovery extends TypedEmitter {
}

/**
* @type {MdnsDiscovery['stop']}
* @type {LocalDiscovery['stop']}
*/
async #stop({ force = false, timeout = 0 } = {}) {
this.#log('stopping')
Expand Down
18 changes: 9 additions & 9 deletions src/invite-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ export class InviteApi extends TypedEmitter {

if (isAlreadyMember) {
for (const peerId of peersToRespondTo) {
this.#sendAlreadyResponse({ peerId, projectId })
await this.#sendAlreadyResponse({ peerId, projectId })
}
return
}
Expand All @@ -123,7 +123,7 @@ export class InviteApi extends TypedEmitter {
}

try {
this.#sendAcceptResponse({
await this.#sendAcceptResponse({
peerId: pendingInvite.fromPeerId,
projectId,
})
Expand Down Expand Up @@ -162,7 +162,7 @@ export class InviteApi extends TypedEmitter {
)
}
for (const peerId of this.#peersToRespondTo.get(projectId)) {
this.#sendRejectResponse({ peerId, projectId })
await this.#sendRejectResponse({ peerId, projectId })
}
}

Expand All @@ -171,9 +171,9 @@ export class InviteApi extends TypedEmitter {
*
* @param {{ peerId: string, projectId: string }} opts
*/
#sendAcceptResponse({ peerId, projectId }) {
async #sendAcceptResponse({ peerId, projectId }) {
const projectKey = Buffer.from(projectId, 'hex')
this.rpc.inviteResponse(peerId, {
await this.rpc.inviteResponse(peerId, {
projectKey,
decision: InviteResponse_Decision.ACCEPT,
})
Expand All @@ -184,10 +184,10 @@ export class InviteApi extends TypedEmitter {
*
* @param {{ peerId: string, projectId: string }} opts
*/
#sendAlreadyResponse({ peerId, projectId }) {
async #sendAlreadyResponse({ peerId, projectId }) {
const projectKey = Buffer.from(projectId, 'hex')
try {
this.rpc.inviteResponse(peerId, {
await this.rpc.inviteResponse(peerId, {
projectKey,
decision: InviteResponse_Decision.ALREADY,
})
Expand All @@ -202,10 +202,10 @@ export class InviteApi extends TypedEmitter {
*
* @param {{ peerId: string, projectId: string }} opts
*/
#sendRejectResponse({ peerId, projectId }) {
async #sendRejectResponse({ peerId, projectId }) {
const projectKey = Buffer.from(projectId, 'hex')
try {
this.rpc.inviteResponse(peerId, {
await this.rpc.inviteResponse(peerId, {
projectKey,
decision: InviteResponse_Decision.REJECT,
})
Expand Down
36 changes: 24 additions & 12 deletions src/rpc/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
InviteResponse,
InviteResponse_Decision,
} from '../generated/rpc.js'
import pDefer from 'p-defer'

const PROTOCOL_NAME = 'mapeo/rpc'

Expand Down Expand Up @@ -40,6 +41,7 @@ class Peer {
#state = 'connecting'
#publicKey
#channel
#connected
/** @type {Map<string, Array<DeferredPromise<InviteResponse['decision']>>>} */
pendingInvites = new Map()

Expand All @@ -51,6 +53,7 @@ class Peer {
constructor({ publicKey, channel }) {
this.#publicKey = publicKey
this.#channel = channel
this.#connected = pDefer()
}
get info() {
return {
Expand All @@ -72,6 +75,7 @@ class Peer {
return // TODO: report error - this should not happen
}
this.#state = 'connected'
this.#connected.resolve()
break
case 'disconnect':
/* c8 ignore next */
Expand All @@ -87,27 +91,28 @@ class Peer {
}
}
/** @param {InviteWithKeys} invite */
sendInvite(invite) {
this.#assertConnected()
async sendInvite(invite) {
await this.#assertConnected()
const buf = Buffer.from(Invite.encode(invite).finish())
const messageType = MESSAGE_TYPES.Invite
this.#channel.messages[messageType].send(buf)
}
/** @param {InviteResponse} response */
sendInviteResponse(response) {
this.#assertConnected()
async sendInviteResponse(response) {
await this.#assertConnected()
const buf = Buffer.from(InviteResponse.encode(response).finish())
const messageType = MESSAGE_TYPES.InviteResponse
this.#channel.messages[messageType].send(buf)
}
/** @param {DeviceInfo} deviceInfo */
sendDeviceInfo(deviceInfo) {
this.#assertConnected()
async sendDeviceInfo(deviceInfo) {
await this.#assertConnected()
const buf = Buffer.from(DeviceInfo.encode(deviceInfo).finish())
const messageType = MESSAGE_TYPES.DeviceInfo
this.#channel.messages[messageType].send(buf)
}
#assertConnected() {
async #assertConnected() {
await this.#connected.promise
if (this.#state === 'connected' && !this.#channel.closed) return
/* c8 ignore next */
throw new PeerDisconnectedError() // TODO: report error - this should not happen
Expand All @@ -125,6 +130,8 @@ class Peer {
export class MapeoRPC extends TypedEmitter {
/** @type {Map<string, Peer>} */
#peers = new Map()
/** @type {Set<Promise<any>>} */
#opening = new Set()

constructor() {
super()
Expand All @@ -145,6 +152,7 @@ export class MapeoRPC extends TypedEmitter {
* @returns {Promise<InviteResponse['decision']>}
*/
async invite(peerId, { timeout, ...invite }) {
await Promise.all(this.#opening)
const peer = this.#peers.get(peerId)
if (!peer) console.log([...this.#peers.keys()])
if (!peer) throw new UnknownPeerError('Unknown peer ' + peerId)
Expand All @@ -168,7 +176,7 @@ export class MapeoRPC extends TypedEmitter {
origReject(new TimeoutError(`No response after ${timeout}ms`))
}, timeout)

peer.sendInvite(invite)
peer.sendInvite(invite).catch(origReject)

/** @type {typeof origResolve} */
function resolve(value) {
Expand All @@ -191,21 +199,23 @@ export class MapeoRPC extends TypedEmitter {
* @param {InviteResponse['projectKey']} options.projectKey project key of the invite you are responding to
* @param {InviteResponse['decision']} options.decision response to invite, one of "ACCEPT", "REJECT", or "ALREADY" (already on project)
*/
inviteResponse(peerId, options) {
async inviteResponse(peerId, options) {
await Promise.all(this.#opening)
const peer = this.#peers.get(peerId)
if (!peer) throw new UnknownPeerError('Unknown peer ' + peerId)
peer.sendInviteResponse(options)
await peer.sendInviteResponse(options)
}

/**
*
* @param {string} peerId id of the peer you want to send to (publicKey of peer as hex string)
* @param {DeviceInfo} deviceInfo device info to send
*/
sendDeviceInfo(peerId, deviceInfo) {
async sendDeviceInfo(peerId, deviceInfo) {
await Promise.all(this.#opening)
const peer = this.#peers.get(peerId)
if (!peer) throw new UnknownPeerError('Unknown peer ' + peerId)
peer.sendDeviceInfo(deviceInfo)
await peer.sendDeviceInfo(deviceInfo)
}

/**
Expand All @@ -219,10 +229,12 @@ export class MapeoRPC extends TypedEmitter {
stream.userData && Protomux.isProtomux(stream.userData)
? stream.userData
: Protomux.from(stream)
this.#opening.add(stream.opened)

// noiseSecretStream.remotePublicKey can be null before the stream has
// opened, so this helped awaits the open
openedNoiseSecretStream(stream).then((stream) => {
this.#opening.delete(stream.opened)
if (stream.destroyed) return
const { remotePublicKey } = stream

Expand Down
1 change: 0 additions & 1 deletion src/sync/namespace-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ export class NamespaceSyncState {

/**
* @param {import('./core-sync-state.js').PeerCoreState['status']} [status]
* @returns
*/
export function createState(status) {
if (status) {
Expand Down
13 changes: 8 additions & 5 deletions tests/discovery/mdns.js → tests/discovery/local-discovery.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@ import { KeyManager } from '@mapeo/crypto'
import { setTimeout as delay } from 'node:timers/promises'
import pDefer from 'p-defer'
import { keyToPublicId } from '@mapeo/crypto'
import { ERR_DUPLICATE, MdnsDiscovery } from '../../src/discovery/mdns.js'
import {
ERR_DUPLICATE,
LocalDiscovery,
} from '../../src/discovery/local-discovery.js'
import NoiseSecretStream from '@hyperswarm/secret-stream'

test('mdns - discovery and sharing of data', (t) => {
const deferred = pDefer()
const identityKeypair1 = new KeyManager(randomBytes(16)).getIdentityKeypair()
const identityKeypair2 = new KeyManager(randomBytes(16)).getIdentityKeypair()

const mdnsDiscovery1 = new MdnsDiscovery({
const mdnsDiscovery1 = new LocalDiscovery({
identityKeypair: identityKeypair1,
})
const mdnsDiscovery2 = new MdnsDiscovery({
const mdnsDiscovery2 = new LocalDiscovery({
identityKeypair: identityKeypair2,
})
const str = 'hi'
Expand Down Expand Up @@ -52,7 +55,7 @@ test('deduplicate incoming connections', async (t) => {

const localKp = new KeyManager(randomBytes(16)).getIdentityKeypair()
const remoteKp = new KeyManager(randomBytes(16)).getIdentityKeypair()
const discovery = new MdnsDiscovery({ identityKeypair: localKp })
const discovery = new LocalDiscovery({ identityKeypair: localKp })
await discovery.start()

discovery.on('connection', (conn) => {
Expand Down Expand Up @@ -113,7 +116,7 @@ async function testMultiple(t, { period, nPeers = 20 }) {

async function spawnPeer(onConnected) {
const identityKeypair = new KeyManager(randomBytes(16)).getIdentityKeypair()
const discovery = new MdnsDiscovery({ identityKeypair })
const discovery = new LocalDiscovery({ identityKeypair })
const peerId = keyToPublicId(discovery.publicKey)
peersById.set(peerId, discovery)
const conns = []
Expand Down
1 change: 0 additions & 1 deletion tests/invite-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ test('invitor disconnecting results in accept throwing', async (t) => {

r1.on('peers', async (peers) => {
if (peers.length !== 1 || peers[0].status === 'disconnected') return

await t.exception(() => {
return r1.invite(peers[0].id, {
projectKey,
Expand Down
Loading

0 comments on commit 50ed1de

Please sign in to comment.