Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reduce & simplify sync state #411

Merged
merged 4 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { LocalDiscovery } from './discovery/local-discovery.js'
import { Capabilities } from './capabilities.js'
import NoiseSecretStream from '@hyperswarm/secret-stream'
import { Logger } from './logger.js'
import { kSyncState } from './sync/sync-api.js'

/** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */

Expand Down Expand Up @@ -562,7 +563,7 @@ export class MapeoManager extends TypedEmitter {
const {
auth: { localState: authState },
config: { localState: configState },
} = project.$sync.getState()
} = project.$sync[kSyncState].getState()
const isCapabilitySynced = capability !== Capabilities.NO_ROLE_CAPABILITIES
const isProjectSettingsSynced =
projectSettings !== MapeoProject.EMPTY_PROJECT_SETTINGS
Expand All @@ -589,15 +590,15 @@ export class MapeoManager extends TypedEmitter {
timeoutId = setTimeout(onTimeout, timeoutMs)
return
}
project.$sync.off('sync-state', onSyncState)
project.$sync[kSyncState].off('state', onSyncState)
resolve(this.#waitForInitialSync(project, { timeoutMs }))
}
const onTimeout = () => {
project.$sync.off('sync-state', onSyncState)
project.$sync[kSyncState].off('state', onSyncState)
reject(new Error('Sync timeout'))
}
let timeoutId = setTimeout(onTimeout, timeoutMs)
project.$sync.on('sync-state', onSyncState)
project.$sync[kSyncState].on('state', onSyncState)
})
}

Expand Down
3 changes: 3 additions & 0 deletions src/sync/peer-sync-controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import { createMap } from '../utils.js'

/** @type {Namespace[]} */
export const PRESYNC_NAMESPACES = ['auth', 'config', 'blobIndex']
export const DATA_NAMESPACES = NAMESPACES.filter(
(ns) => !PRESYNC_NAMESPACES.includes(ns)
)

export class PeerSyncController {
#replicatingCores = new Set()
Expand Down
109 changes: 97 additions & 12 deletions src/sync/sync-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,45 @@ import { SyncState } from './sync-state.js'
import {
PeerSyncController,
PRESYNC_NAMESPACES,
DATA_NAMESPACES,
} from './peer-sync-controller.js'
import { Logger } from '../logger.js'
import { NAMESPACES } from '../core-manager/index.js'
import { keyToId } from '../utils.js'

export const kHandleDiscoveryKey = Symbol('handle discovery key')
export const kSyncState = Symbol('sync state')

/**
* @typedef {'initial' | 'full'} SyncType
*/

/**
* @typedef {object} SyncTypeState
* @property {number} have Number of blocks we have locally
* @property {number} want Number of blocks we want from connected peers
* @property {number} wanted Number of blocks that connected peers want from us
* @property {number} missing Number of blocks missing (we don't have them, but connected peers don't have them either)
* @property {boolean} dataToSync Is there data available to sync? (want > 0 || wanted > 0)
* @property {boolean} syncing Are we currently syncing?
*/

/**
* @typedef {object} State
* @property {SyncTypeState} initial State of initial sync (sync of auth, metadata and project config)
* @property {SyncTypeState} data State of data sync (observations, map data, photos, audio, video etc.)
* @property {number} connectedPeers Number of connected peers
*/

/**
* @typedef {object} SyncEvents
* @property {(syncState: import('./sync-state.js').State) => void} sync-state
* @property {(syncState: State) => void} sync-state
*/

/**
* @extends {TypedEmitter<SyncEvents>}
*/
export class SyncApi extends TypedEmitter {
syncState
#coreManager
#capabilities
/** @type {Map<import('protomux'), PeerSyncController>} */
Expand All @@ -45,9 +67,13 @@ export class SyncApi extends TypedEmitter {
this.#l = Logger.create('syncApi', logger)
this.#coreManager = coreManager
this.#capabilities = capabilities
this.syncState = new SyncState({ coreManager, throttleMs })
this.syncState.setMaxListeners(0)
this.syncState.on('state', this.emit.bind(this, 'sync-state'))
this[kSyncState] = new SyncState({ coreManager, throttleMs })
this[kSyncState].setMaxListeners(0)
this[kSyncState].on('state', (namespaceSyncState) => {
const state = reduceSyncState(namespaceSyncState)
state.data.syncing = this.#dataSyncEnabled.has('local')
this.emit('sync-state', state)
})

this.#coreManager.creatorCore.on('peer-add', this.#handlePeerAdd)
this.#coreManager.creatorCore.on('peer-remove', this.#handlePeerRemove)
Expand Down Expand Up @@ -81,8 +107,14 @@ export class SyncApi extends TypedEmitter {
}, 500)
}

/**
* Get the current sync state (initial and full). Also emitted via the 'sync-state' event
* @returns {State}
*/
getState() {
return this.syncState.getState()
const state = reduceSyncState(this[kSyncState].getState())
state.data.syncing = this.#dataSyncEnabled.has('local')
return state
}

/**
Expand Down Expand Up @@ -110,16 +142,17 @@ export class SyncApi extends TypedEmitter {
}

/**
* @param {'initial' | 'full'} type
* @param {SyncType} type
*/
async waitForSync(type) {
const state = this.getState()
const state = this[kSyncState].getState()
const namespaces = type === 'initial' ? PRESYNC_NAMESPACES : NAMESPACES
if (isSynced(state, namespaces, this.#peerSyncControllers)) return
return new Promise((res) => {
this.on('sync-state', function onState(state) {
if (!isSynced(state, namespaces, this.#peerSyncControllers)) return
this.off('sync-state', onState)
const _this = this
this[kSyncState].on('state', function onState(state) {
if (!isSynced(state, namespaces, _this.#peerSyncControllers)) return
_this[kSyncState].off('state', onState)
res(null)
})
})
Expand Down Expand Up @@ -147,7 +180,7 @@ export class SyncApi extends TypedEmitter {
const peerSyncController = new PeerSyncController({
protomux,
coreManager: this.#coreManager,
syncState: this.syncState,
syncState: this[kSyncState],
capabilities: this.#capabilities,
logger: this.#l,
})
Expand Down Expand Up @@ -209,3 +242,55 @@ function isSynced(state, namespaces, peerSyncControllers) {
}
return true
}

/**
* Reduce the more detailed sync state we use internally to the public sync
* state that sums namespaces into an 'initial' and 'full' sync state
* @param {import('./sync-state.js').State} namespaceSyncState
* @returns {State}
*/
function reduceSyncState(namespaceSyncState) {
const connectedPeers = Object.values(
namespaceSyncState.auth.remoteStates
).filter((remoteState) => remoteState.status === 'connected').length
const state = {
initial: createInitialSyncTypeState(),
data: createInitialSyncTypeState(),
connectedPeers,
}
for (const ns of PRESYNC_NAMESPACES) {
const nsState = namespaceSyncState[ns]
mutatingAddNamespaceState(state.initial, nsState)
}
for (const ns of DATA_NAMESPACES) {
const nsState = namespaceSyncState[ns]
mutatingAddNamespaceState(state.data, nsState)
}
return state
}

/**
* @param {SyncTypeState} accumulator
* @param {import('./namespace-sync-state.js').SyncState} currentValue
*/
function mutatingAddNamespaceState(accumulator, currentValue) {
accumulator.have += currentValue.localState.have
accumulator.want += currentValue.localState.want
accumulator.wanted += currentValue.localState.wanted
accumulator.missing += currentValue.localState.missing
accumulator.dataToSync ||= currentValue.dataToSync
}

/**
* @returns {SyncTypeState}
*/
function createInitialSyncTypeState() {
return {
have: 0,
want: 0,
wanted: 0,
missing: 0,
dataToSync: false,
syncing: true,
}
}
4 changes: 3 additions & 1 deletion test-e2e/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { generate } from '@mapeo/mock-data'
import { valueOf } from '../src/utils.js'
import pTimeout from 'p-timeout'
import { BLOCKED_ROLE_ID, COORDINATOR_ROLE_ID } from '../src/capabilities.js'
import { kSyncState } from '../src/sync/sync-api.js'

const SCHEMAS_INITIAL_SYNC = ['preset', 'field']

Expand Down Expand Up @@ -226,8 +227,9 @@ test('no sync capabilities === no namespaces sync apart from auth', async (t) =>

await waitForSync([inviteeProject, invitorProject], 'full')

// Reaching into internals here, but only to validate the result of the test, so not fully e2e
const [invitorState, inviteeState, blockedState] = projects.map((p) =>
p.$sync.getState()
p.$sync[kSyncState].getState()
)

t.is(invitorState.config.localState.have, configDocsCount + COUNT) // count device info doc for each invited device
Expand Down
7 changes: 4 additions & 3 deletions test-e2e/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { valueOf } from '../src/utils.js'
import { randomInt } from 'node:crypto'
import { temporaryDirectory } from 'tempy'
import fsPromises from 'node:fs/promises'
import { kSyncState } from '../src/sync/sync-api.js'

const FAST_TESTS = !!process.env.FAST_TESTS
const projectMigrationsFolder = new URL('../drizzle/project', import.meta.url)
Expand Down Expand Up @@ -228,14 +229,14 @@ export function round(value, decimalPlaces) {
* @param {'initial' | 'full'} [type]
*/
async function waitForProjectSync(project, peerIds, type = 'initial') {
const state = await project.$sync.getState()
const state = await project.$sync[kSyncState].getState()
if (hasPeerIds(state.auth.remoteStates, peerIds)) {
return project.$sync.waitForSync(type)
}
return new Promise((res) => {
project.$sync.on('sync-state', function onState(state) {
project.$sync[kSyncState].on('state', function onState(state) {
if (!hasPeerIds(state.auth.remoteStates, peerIds)) return
project.$sync.off('sync-state', onState)
project.$sync[kSyncState].off('state', onState)
res(project.$sync.waitForSync(type))
})
})
Expand Down
Loading