Skip to content

Commit

Permalink
WIP: connectServers and disconnectServers()
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaclennan and EvanHahn committed Oct 2, 2024
1 parent 36cb447 commit 48a3c2e
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 12 deletions.
30 changes: 25 additions & 5 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ export class MapeoProject extends TypedEmitter {
this.#projectId = projectKeyToId(projectKey)
this.#loadingConfig = false

const getReplicationStream = this[kProjectReplicate].bind(
this,
// TODO: See if we can fix these
/** @type {any} */ (true)
)

///////// 1. Setup database
this.#sqlite = new Database(dbPath)
const db = drizzle(this.#sqlite)
Expand Down Expand Up @@ -302,11 +308,7 @@ export class MapeoProject extends TypedEmitter {
encryptionKeys,
projectKey,
rpc: localPeers,
getReplicationStream: this[kProjectReplicate].bind(
this,
// TODO: See if we can fix these
/** @type {any} */ (true)
),
getReplicationStream,
// TODO: This should be scoped to a single peer, not all peers
waitForInitialSync: () => this.$sync.waitForSync('initial'),
dataTypes: {
Expand Down Expand Up @@ -349,6 +351,24 @@ export class MapeoProject extends TypedEmitter {
coreOwnership: this.#coreOwnership,
roles: this.#roles,
logger: this.#l,
getServerWebsocketUrls: async () => {
const members = await this.#memberApi.getMany()
/** @type {string[]} */
const serverWebsocketUrls = []
for (const member of members) {
if (
member.deviceType === 'selfHostedServer' &&
member.selfHostedServerDetails
) {
const { baseUrl } = member.selfHostedServerDetails
const wsUrl = new URL(`/sync/${this.#projectId}`, baseUrl)
wsUrl.protocol = wsUrl.protocol === 'http:' ? 'ws:' : 'wss:'
serverWebsocketUrls.push(wsUrl.href)
}
}
return serverWebsocketUrls
},
getReplicationStream,
})

this.#translationApi = new TranslationApi({
Expand Down
2 changes: 2 additions & 0 deletions src/member-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { abortSignalAny } from './lib/ponyfills.js'
import timingSafeEqual from './lib/timing-safe-equal.js'
import { MEMBER_ROLE_ID, ROLES, isRoleIdForNewInvite } from './roles.js'
import { wsCoreReplicator } from './server/ws-core-replicator.js'
import { once } from 'node:events'
/**
* @import {
* DeviceInfo,
Expand Down Expand Up @@ -352,6 +353,7 @@ export class MemberApi extends TypedEmitter {
await this.#waitForInitialSync()

websocket.close()
await once(websocket, 'close')
}

/**
Expand Down
7 changes: 5 additions & 2 deletions src/sync/core-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -389,16 +389,19 @@ export function deriveState(coreState) {
let iWantFromSomeoneElse = 0

for (const [peerId, peer] of peers.entries()) {
const remoteState = remoteStates[peerId]
const shouldAddToLocalState = remoteState?.status !== 'stopped'

const peerHaves = peer.haveWord(i) & truncate
remoteStates[peerId].have += bitCount32(peerHaves)

const theyWantFromMe = peer.wantWord(i) & ~peerHaves & localHaves
remoteStates[peerId].want += bitCount32(theyWantFromMe)
someoneElseWantsFromMe |= theyWantFromMe
if (shouldAddToLocalState) someoneElseWantsFromMe |= theyWantFromMe

const iWantFromThem = peerHaves & ~localHaves
remoteStates[peerId].wanted += bitCount32(iWantFromThem)
iWantFromSomeoneElse |= iWantFromThem
if (shouldAddToLocalState) iWantFromSomeoneElse |= iWantFromThem
}

localState.wanted += bitCount32(someoneElseWantsFromMe)
Expand Down
71 changes: 69 additions & 2 deletions src/sync/sync-api.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { TypedEmitter } from 'tiny-typed-emitter'
import WebSocket from 'ws'
import { SyncState } from './sync-state.js'
import { PeerSyncController } from './peer-sync-controller.js'
import { Logger } from '../logger.js'
Expand All @@ -9,9 +10,11 @@ import {
} from '../constants.js'
import { ExhaustivenessError, assert, keyToId, noop } from '../utils.js'
import { NO_ROLE_ID } from '../roles.js'
import { wsCoreReplicator } from '../server/ws-core-replicator.js'
/** @import { CoreOwnership as CoreOwnershipDoc } from '@comapeo/schema' */
/** @import { CoreOwnership } from '../core-ownership.js' */
/** @import { OpenedNoiseStream } from '../lib/noise-secret-stream-helpers.js' */
/** @import { ReplicationStream } from '../types.js' */

export const kHandleDiscoveryKey = Symbol('handle discovery key')
export const kSyncState = Symbol('sync state')
Expand Down Expand Up @@ -79,20 +82,31 @@ export class SyncApi extends TypedEmitter {
#l

/**
*
* @param {object} opts
* @param {import('../core-manager/index.js').CoreManager} opts.coreManager
* @param {CoreOwnership} opts.coreOwnership
* @param {import('../roles.js').Roles} opts.roles
* @param {() => Promise<Iterable<string>>} opts.getServerWebsocketUrls
* @param {() => ReplicationStream} opts.getReplicationStream
* @param {number} [opts.throttleMs]
* @param {Logger} [opts.logger]
*/
constructor({ coreManager, throttleMs = 200, roles, logger, coreOwnership }) {
constructor({
coreManager,
throttleMs = 200,
roles,
getServerWebsocketUrls,
getReplicationStream,
logger,
coreOwnership,
}) {
super()
this.#l = Logger.create('syncApi', logger)
this.#coreManager = coreManager
this.#coreOwnership = coreOwnership
this.#roles = roles
this.#getServerWebsocketUrls = getServerWebsocketUrls
this.#getReplicationStream = getReplicationStream
this[kSyncState] = new SyncState({
coreManager,
throttleMs,
Expand Down Expand Up @@ -271,6 +285,59 @@ export class SyncApi extends TypedEmitter {
this.emit('sync-state', this.#getState(namespaceSyncState))
}

// TODO: Move these higher up
#getServerWebsocketUrls
#getReplicationStream
/** @type {Map<string, WebSocket>} */
#serverWebsockets = new Map()

/**
* @returns {void}
*/
connectServers() {
// TODO: decide how to handle this async stuff
this.#getServerWebsocketUrls()
.then((urls) => {
for (const url of urls) {
const existingWebsocket = this.#serverWebsockets.get(url)
console.log('@@@@', 'connecting to', url)
if (
existingWebsocket &&
(existingWebsocket.readyState === WebSocket.OPEN ||
existingWebsocket.readyState === WebSocket.CONNECTING)
) {
continue
}

const websocket = new WebSocket(url)

// TODO: Remove this after we've debugged why we're getting a 400 error
websocket.on('unexpected-respose', (req, res) => {
console.log('@@@@', 'unexpected response', req, res)
})

const replicationStream = this.#getReplicationStream()
wsCoreReplicator(websocket, replicationStream)

this.#serverWebsockets.set(url, websocket)
websocket.once('close', () => {
this.#serverWebsockets.delete(url)
})
}
})
.catch(noop)
}

/**
* @returns {void}
*/
disconnectServers() {
for (const websocket of this.#serverWebsockets.values()) {
websocket.close()
}
this.#serverWebsockets.clear()
}

/**
* Start syncing data cores.
*
Expand Down
114 changes: 111 additions & 3 deletions test-e2e/server.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
import assert from 'node:assert/strict'
import test from 'node:test'
import { MEMBER_ROLE_ID } from '../src/roles.js'
import { createManager, createManagers, getManagerOptions } from './utils.js'
import {
connectPeers,
createManager,
createManagers,
getManagerOptions,
invite,
waitForPeers,
waitForSync,
} from './utils.js'
import createServer from '../src/server/app.js'
import { valueOf } from '@comapeo/schema'
import { generate } from '@mapeo/mock-data'
/** @import { MapeoManager } from '../src/mapeo-manager.js' */

// TODO: test invalid base URL
// TODO: test bad requests
Expand Down Expand Up @@ -59,18 +70,115 @@ test("can't add a server to two different projects", async (t) => {
}, Error)
})

test.only('TODO', { timeout: 2 ** 30 }, async (t) => {
// create two managers
const managers = await createManagers(2, t, 'mobile')
const [managerA, managerB] = managers

// manager 1 sets up server
const projectId = await managerA.createProject({ name: 'foo' })
const managerAProject = await managerA.getProject(projectId)
const serverBaseUrl = await createTestServer(t)
await managerAProject.$member.addServerPeer(serverBaseUrl, {
dangerouslyAllowInsecureConnections: true,
})

// TODO: remove this
await new Promise((resolve) => {
setTimeout(resolve, 3000)
})

// manager 1, invite manager 2
const disconnect1 = connectPeers(managers)
t.after(disconnect1)
await waitForPeers(managers)
await invite({ invitor: managerA, invitees: [managerB], projectId })
const managerBProject = await managerB.getProject(projectId)

// sync managers (to tell manager 2 about server)
const projects = [managerAProject, managerBProject]
await waitForSync(projects, 'initial')
const members = await managerBProject.$member.getMany() // TODO: maybe rename this
const serverPeer = members.find(
(member) => member.deviceType === 'selfHostedServer'
)
assert(serverPeer, 'expected a server peer to be found by the client')

// disconnect managers
disconnect1()
await waitForNoPeersToBeConnected(managerA)
await waitForNoPeersToBeConnected(managerB)

// Start both syncing data
managerAProject.$sync.start()
managerBProject.$sync.start()
console.log('@@@@', 'about to connect servers')
managerAProject.$sync.connectServers()
managerBProject.$sync.connectServers()
console.log('@@@@', 'connected servers')
await waitForSyncWithServer() // TODO this is bogus and silly, just used for waiting
console.log('@@@@', 'waited a bit')

// manager 1 adds data, syncs with server
const observation = await managerAProject.observation.create(
valueOf(generate('observation')[0])
)
await waitForSyncWithServer()

// Check manager 2 doesn't have the data
await assert.rejects(() =>
managerBProject.observation.getByDocId(observation.docId)
)

// manager 2 now has data from manager 1, even though it wasn't connected to manager 1 directly
await waitForSyncWithServer()
assert(
await managerBProject.observation.getByDocId(observation.docId),
'manager B now sees data'
)
})

/**
*
* @param {import('node:test').TestContext} t
* @returns {Promise<string>} server base URL
*/
async function createTestServer(t) {
// TODO: Use a port that's guaranteed to be open
const port = 9876
const server = createServer({
...getManagerOptions('test server'),
serverName: 'test server',
serverPublicBaseUrl: 'https://mapeo.example',
serverPublicBaseUrl: 'http://localhost:' + port,
})
const serverAddress = await server.listen()
const serverAddress = await server.listen({ port })
t.after(() => server.close())
return serverAddress
}

/**
* @param {MapeoManager} manager
* @returns {Promise<void>}
*/
function waitForNoPeersToBeConnected(manager) {
return new Promise((resolve) => {
const checkIfDone = async () => {
const isDone = (await manager.listLocalPeers()).every(
(peer) => peer.status === 'disconnected'
)
if (isDone) {
manager.off('local-peers', checkIfDone)
resolve()
}
}
manager.on('local-peers', checkIfDone)
checkIfDone()
})
}

function waitForSyncWithServer() {
// TODO: This is fake
return new Promise((resolve) => {
setTimeout(resolve, 3000)
})
}

0 comments on commit 48a3c2e

Please sign in to comment.