Skip to content

Commit

Permalink
Progress on re-connect during initial sync
Browse files Browse the repository at this point in the history
  • Loading branch information
ShishKabab committed Apr 17, 2020
1 parent 5cf27f1 commit 1522ab6
Show file tree
Hide file tree
Showing 10 changed files with 550 additions and 290 deletions.
312 changes: 220 additions & 92 deletions ts/fast-sync/channels.ts

Large diffs are not rendered by default.

24 changes: 21 additions & 3 deletions ts/fast-sync/chunking.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
getStringChunk,
receiveInChucks,
} from './chunking'
import Interruptable from './interruptable'

describe('Fast sync channel package chunking', () => {
it('should calculate the right chunk count for strings exactly fitting the chunk size', () => {
Expand Down Expand Up @@ -31,8 +32,25 @@ describe('Fast sync channel package chunking', () => {
it('should correctly receive data in chunks', async () => {
const chunks = [`chunk:0:3:ab`, `chunk:1:3:cde`, `chunk:2:3:fghij`]

expect(await receiveInChucks(async () => chunks.shift()!)).toEqual(
'abcdefghij',
)
const interruptable = new Interruptable()
expect(
await receiveInChucks(async () => chunks.shift()!, interruptable),
).toEqual('abcdefghij')
})

it('should throw an exception when cancelled', async () => {
const chunks = [`chunk:0:3:ab`, `chunk:1:3:cde`, `chunk:2:3:fghij`]
let index = -1

const interruptable = new Interruptable({ throwOnCancelled: true })
await expect(
receiveInChucks(async () => {
++index
if (index === 1) {
await interruptable.cancel()
}
return chunks.shift()!
}, interruptable),
).rejects.toThrow('Tried to execute code on a cancelled interruptable')
})
})
48 changes: 41 additions & 7 deletions ts/fast-sync/chunking.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { splitWithTail } from './utils'
import Interruptable from './interruptable'

export function calculateStringChunkCount(
s: string,
Expand All @@ -17,21 +18,20 @@ export function getStringChunk(

export async function receiveInChucks(
receiveChunk: () => Promise<string>,
interruptable: Interruptable,
): Promise<string> {
let data: string[] = []
let expectedChunkCount: null | number = null

while (true) {
const chunk = await receiveChunk()

const processChunk = (chunk: string) => {
const [
chunkConfirmation,
chunkIndexString,
chunkCountString,
chunkContent,
] = splitWithTail(chunk, ':', 4)
if (chunkConfirmation !== 'chunk') {
throw new Error(`Invalid WebRTC package received: ${chunk}`)
throw new Error(`Invalid WebRTC chunk package received: ${chunk}`)
}

const chunkIndex = parseInt(chunkIndexString)
Expand Down Expand Up @@ -67,10 +67,44 @@ export async function receiveInChucks(
}

data.push(chunkContent)
if (data.length === expectedChunkCount) {
break
}
return { finished: data.length === expectedChunkCount }
}

let running = true
await interruptable.whileLoop(
async () => running,
async () => {
const chunk = (await interruptable.execute(receiveChunk))!
const result = await interruptable.execute(async () =>
processChunk(chunk),
)
if (result?.finished) {
running = false
}
},
)

return data.join('')
}

export async function sendInChunks(
message: string,
send: (chunk: string) => Promise<void>,
options: {
interruptable: Interruptable
chunkSize: number
},
) {
const chunkCount = calculateStringChunkCount(message, options)
let chunkIndex = -1
await options.interruptable.whileLoop(
async () => chunkIndex < chunkCount,
async () => {
chunkIndex += 1
const chunkContent = getStringChunk(message, chunkIndex, {
chunkSize: options.chunkSize,
})
await send(`chunk:${chunkIndex}:${chunkCount}:${chunkContent}`)
},
)
}
4 changes: 0 additions & 4 deletions ts/fast-sync/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,9 @@ async function createWebRTCSyncChannels(options: {
return {
senderChannel: new WebRTCFastSyncChannel({
peer: peers[0],
reEstablishConnection: async () => peers[0],
maxReconnectAttempts: 0,
}),
receiverChannel: new WebRTCFastSyncChannel({
peer: peers[1],
reEstablishConnection: async () => peers[1],
maxReconnectAttempts: 0,
}),
}
}
Expand Down
3 changes: 1 addition & 2 deletions ts/fast-sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class FastSync {

constructor(private options: FastSyncOptions) {
this.totalObjectsProcessed = 0
this.forwardReconnectEvents()
}

get state() {
Expand Down Expand Up @@ -104,7 +105,6 @@ export class FastSync {

const interruptable = (this.interruptable = new Interruptable())
this._state = 'running'
this.forwardReconnectEvents()

try {
const syncInfo =
Expand Down Expand Up @@ -224,7 +224,6 @@ export class FastSync {

this.options.channel.events.on('paused', stateChangeHandler('paused'))
this.options.channel.events.on('resumed', stateChangeHandler('resumed'))
this.forwardReconnectEvents()

try {
const syncInfo = await this.options.channel.receiveSyncInfo()
Expand Down
11 changes: 10 additions & 1 deletion ts/fast-sync/interruptable.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import { ResolvablePromise, resolvablePromise } from './utils'

export class InterruptableCancelledError extends Error {}

export default class Interruptable {
cancelled: boolean = false
private pausePromise: ResolvablePromise<void> | null = null // only set if paused, resolved when pause ends

constructor(private options?: { throwOnCancelled: boolean }) {}

get paused(): boolean {
return !!this.pausePromise
}
Expand Down Expand Up @@ -73,7 +77,7 @@ export default class Interruptable {
}
}

async execute(f: () => Promise<void>) {
async execute<ReturnValue>(f: () => Promise<ReturnValue | undefined>) {
if (await this._shouldCancelAfterWaitingForPause()) {
return
}
Expand All @@ -85,6 +89,11 @@ export default class Interruptable {
if (this.pausePromise) {
await this.pausePromise.promise
}
if (this.cancelled && this.options?.throwOnCancelled) {
throw new InterruptableCancelledError(
'Tried to execute code on a cancelled interruptable',
)
}
return this.cancelled
}
}
22 changes: 13 additions & 9 deletions ts/fast-sync/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@ export type FastSyncRole = 'sender' | 'receiver'
export const flippedRole = (role: FastSyncRole): FastSyncRole =>
role === 'sender' ? 'receiver' : 'sender'
export type FastSyncOrder = 'receive-first' | 'send-first'
export type FastSyncPackage<UserPackageType = any> =
| { type: 'batch'; batch: any }
| { type: 'confirm' }
| { type: 'state-change'; state: 'paused' | 'running' }
| { type: 'sync-info'; info: FastSyncInfo }
| { type: 'finish' }
| { type: 'user-package'; package: UserPackageType }
export type FastSyncPackage<
UserPackageType = any,
WithIndex extends boolean = true
> = (WithIndex extends true ? { index: number } : {}) &
(
| { type: 'sync-info'; info: FastSyncInfo }
| { type: 'batch'; batch: any }
| { type: 'finish' }
| { type: 'state-change'; state: 'paused' | 'running' }
| { type: 'user-package'; package: UserPackageType }
| { type: 'confirm' }
)

export interface FastSyncChannelEvents {
reconnected: () => void
reconnect: (event: { attempt: number }) => void
stalled: () => void
resumed: () => void
paused: () => void
Expand Down
50 changes: 30 additions & 20 deletions ts/integration/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
createMemorySharedSyncLog,
lazyMemorySignalTransportFactory,
} from './index.tests'
import { registerModuleMapCollections } from '@worldbrain/storex-pattern-modules'
import { FastSyncEvents } from '../fast-sync'
import { PromiseContentType } from '../types.test'
import { FastSyncChannel } from '../fast-sync/types'
Expand Down Expand Up @@ -49,6 +48,9 @@ describe('Integration helpers', () => {
batchSize: 1,
})
initialSync.wrtc = wrtc
initialSync.events.on('error', ({ error }) => {
throw error
})

const continuousSyncDeps: ContinuousSyncDependencies = {
auth: { getUserId: async () => 456 },
Expand Down Expand Up @@ -88,22 +90,17 @@ describe('Integration helpers', () => {
const doInitialSync = async (options: {
source: {
initialSync: InitialSync
fastSyncChannelSetup?: (channel: FastSyncChannel) => void
}
target: {
initialSync: InitialSync
fastSyncChannelSetup?: (channel: FastSyncChannel) => void
}
}) => {
const {
initialMessage,
} = await options.source.initialSync.requestInitialSync({
fastSyncChannelSetup: options.source.fastSyncChannelSetup,
})
} = await options.source.initialSync.requestInitialSync()

await options.target.initialSync.answerInitialSync({
initialMessage,
fastSyncChannelSetup: options.target.fastSyncChannelSetup,
})

for (const client of [options.source, options.target]) {
Expand Down Expand Up @@ -253,6 +250,10 @@ describe('Integration helpers', () => {
},
},
})
// integration[0].initialSync.debug = true
// integration[1].initialSync.debug = true
// integration[0].initialSync.peerName = `Peer 0`
// integration[1].initialSync.peerName = `Peer 1`

await clients[0].storageManager
.collection('test')
Expand All @@ -269,23 +270,32 @@ describe('Integration helpers', () => {
integration[0].initialSync.events.on('reconnected', () => {
reconnected = true
})
integration[0].initialSync.events.on(
'fastSyncChannelCreated',
channel => {
// channel.timeoutInMiliseconds = 100;
;(channel as any).peerName = 'Peer 0'
},
)
integration[1].initialSync.events.on(
'fastSyncChannelCreated',
channel => {
// let packageCounter = 0
// channel.preSend = async () => {
// if (++packageCounter === 2) {
// return new Promise(resolve => setTimeout(resolve, 200))
// }
// }
// channel.timeoutInMiliseconds = 100;
;(channel as any).peerName = 'Peer 1'
},
)

expect(reconnected).toBe(false)

await doInitialSync({
source: {
...integration[0],
fastSyncChannelSetup: channel => {
channel.timeoutInMiliseconds = 100
},
},
target: {
...integration[1],
fastSyncChannelSetup: channel => {
channel.preSend = () =>
new Promise(resolve => setTimeout(resolve, 500))
},
},
source: integration[0],
target: integration[1],
})

expect(reconnected).toBe(true)
Expand Down
Loading

0 comments on commit 1522ab6

Please sign in to comment.