Skip to content

Commit

Permalink
Merge branch 'feat/peer-sync-controller' into feat/sync-controller
Browse files Browse the repository at this point in the history
* feat/peer-sync-controller:
  fix types
  fix bug
  fix PeerSyncController tests
  • Loading branch information
gmaclennan committed Oct 25, 2023
2 parents 314c0ed + c4fd85c commit b625c5b
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 135 deletions.
26 changes: 21 additions & 5 deletions src/sync/namespace-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class NamespaceSyncState {
if (this.#cachedState) return this.#cachedState
/** @type {SyncState} */
const state = {
localState: { want: 0, have: 0, wanted: 0, missing: 0 },
localState: createState(),
remoteStates: {},
}
for (const css of this.#coreStates.values()) {
Expand All @@ -65,7 +65,7 @@ export class NamespaceSyncState {
coreState.remoteStates
)) {
if (!(peerId in state.remoteStates)) {
state.remoteStates[peerId] = createPeerState(peerCoreState.status)
state.remoteStates[peerId] = peerCoreState
} else {
mutatingAddPeerState(state.remoteStates[peerId], peerCoreState)
}
Expand Down Expand Up @@ -110,10 +110,26 @@ export class NamespaceSyncState {
}

/**
* @overload
* @returns {SyncState['localState']}
*/

/**
* @overload
* @param {import('./core-sync-state.js').PeerCoreState['status']} status
* @returns {import('./core-sync-state.js').PeerCoreState} */
function createPeerState(status = 'disconnected') {
return { want: 0, have: 0, wanted: 0, missing: 0, status }
* @returns {import('./core-sync-state.js').PeerCoreState}
*/

/**
* @param {import('./core-sync-state.js').PeerCoreState['status']} [status]
* @returns
*/
export function createState(status) {
if (status) {
return { want: 0, have: 0, wanted: 0, missing: 0, status }
} else {
return { want: 0, have: 0, wanted: 0, missing: 0 }
}
}

/**
Expand Down
62 changes: 30 additions & 32 deletions src/sync/peer-sync-controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ export class PeerSyncController {
#peerId
#capabilities
/** @type {Record<Namespace, SyncCapability>} */
#syncCapability = createSyncCapabilityObject('unknown')
#syncCapability = createNamespaceMap('unknown')
#isDataSyncEnabled = false
/** @type {Record<Namespace, import('./core-sync-state.js').CoreState> | undefined} */
#prevLocalState
/** @type {Record<Namespace, import('./core-sync-state.js').CoreState | null>} */
#prevLocalState = createNamespaceMap(null)
/** @type {SyncStatus} */
#syncStatus = createSyncStatusObject()
#syncStatus = createNamespaceMap('unknown')
/** @type {Map<import('hypercore')<'binary', any>, ReturnType<import('hypercore')['download']>>} */
#downloadingRanges = new Map()
/** @type {SyncStatus | undefined} */
#prevSyncStatus
/** @type {SyncStatus} */
#prevSyncStatus = createNamespaceMap('unknown')

/**
* @param {object} opts
Expand Down Expand Up @@ -97,26 +97,34 @@ export class PeerSyncController {
return [ns, nsState.localState]
})

// Map of which namespaces have received new data since last state change
// Map of which namespaces have received new data since last sync change
const didUpdate = mapObject(state, (ns) => {
if (!this.#prevLocalState) return [ns, true]
return [ns, this.#prevLocalState[ns].have !== localState[ns].have]
const nsDidSync =
this.#prevSyncStatus[ns] !== 'synced' &&
this.#syncStatus[ns] === 'synced'
const prevNsState = this.#prevLocalState[ns]
const nsDidUpdate =
nsDidSync &&
(prevNsState === null || prevNsState.have !== localState[ns].have)
if (nsDidUpdate) {
this.#prevLocalState[ns] = localState[ns]
}
return [ns, nsDidUpdate]
})
this.#prevLocalState = localState
this.#prevSyncStatus = this.#syncStatus

if (didUpdate.auth && this.#syncStatus.auth === 'synced') {
if (didUpdate.auth) {
try {
const cap = await this.#capabilities.getCapabilities(this.#peerId)
this.#syncCapability = cap.sync
} catch (e) {
// Any error, consider sync blocked
this.#syncCapability = createSyncCapabilityObject('blocked')
this.#syncCapability = createNamespaceMap('blocked')
}
}
console.log('sync status', this.#peerId, this.#syncStatus)
console.log('cap', this.#syncCapability)
console.log('state', state.auth)
// console.log(this.#peerId.slice(0, 7), this.#syncCapability)
// console.log(this.#peerId.slice(0, 7), didUpdate)
// console.dir(state, { depth: null, colors: true })

// If any namespace has new data, update what is enabled
if (Object.values(didUpdate).indexOf(true) > -1) {
Expand Down Expand Up @@ -255,23 +263,13 @@ function getSyncStatus(peerId, state) {
}

/**
* @param {SyncCapability} capability
* @returns {Record<Namespace, SyncCapability>} */
function createSyncCapabilityObject(capability) {
const cap = /** @type {Record<Namespace, SyncCapability>} */ ({})
for (const ns of NAMESPACES) {
cap[ns] = capability
}
return cap
}

/**
* @returns {SyncStatus}
*/
function createSyncStatusObject() {
const status = /** @type {SyncStatus} */ ({})
* @template T
* @param {T} value
* @returns {Record<Namespace, T>} */
function createNamespaceMap(value) {
const map = /** @type {Record<Namespace, T>} */ ({})
for (const ns of NAMESPACES) {
status[ns] = 'unknown'
map[ns] = value
}
return status
return map
}
6 changes: 5 additions & 1 deletion tests/helpers/rpc.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import NoiseSecretStream from '@hyperswarm/secret-stream'

/**
* @typedef {ReturnType<import('@hyperswarm/secret-stream').keyPair>} KeyPair
*/

/**
* @param {import('../../src/rpc/index.js').MapeoRPC} rpc1
* @param {import('../../src/rpc/index.js').MapeoRPC} rpc2
* @param { {kp1?: import('@hyperswarm/secret-stream'), kp2?: import('@hyperswarm/secret-stream')} } [keyPairs]
* @param { {kp1?: KeyPair, kp2?: KeyPair} } [keyPairs]
* @returns {() => Promise<[void, void]>}
*/
export function replicate(
Expand Down
Loading

0 comments on commit b625c5b

Please sign in to comment.