Skip to content

Commit

Permalink
Put all incoming signals on buffer
Browse files Browse the repository at this point in the history
- this is just an experiment, not intended to be code we roll-out (it doesn't even properly)
- the "SignalBuffer" essentially acts as an abstraction on top of the signal channel for the stall code to use to check for new incoming signals, and not miss any that might have been sent before they knew when to listen
- I feel "SignalBuffer" is overly complicated, though it is that way because of the code that uses it - that may indicate that code is overly complicated
  • Loading branch information
poltak authored and ShishKabab committed Apr 17, 2020
1 parent 7544f42 commit 5cf27f1
Showing 1 changed file with 91 additions and 33 deletions.
124 changes: 91 additions & 33 deletions ts/integration/initial-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,47 +330,58 @@ export class InitialSync {
})
}

private signalReconnectReceiver = (signalChannel: SignalChannel) =>
new Promise<void>(async (resolve, reject) => {
await signalChannel.sendMessage(
JSON.stringify(InitialSync.RECONNECT_REQ_MSG),
)

signalChannel.events.on('signal', ({ payload }) => {
if (payload !== InitialSync.RECONNECT_ACK_MSG) {
return reject(new Error('Cannot re-establish connection'))
}

return resolve()
})
})
private async signalReconnectReceiver(
signalChannel: SignalChannel,
signals: SignalBuffer,
) {
await signalChannel.sendMessage(
JSON.stringify(InitialSync.RECONNECT_REQ_MSG),
)

private signalReconnectSender = (signalChannel: SignalChannel) =>
new Promise<void>(async (resolve, reject) => {
signalChannel.events.on('signal', async ({ payload }) => {
if (payload !== InitialSync.RECONNECT_REQ_MSG) {
return reject(new Error('Cannot re-establish connection'))
}
let payload: string
// TODO: return if not found after certain time
do {
payload = await signals.getLatestOrWait()
} while (payload !== InitialSync.RECONNECT_ACK_MSG)

await signalChannel.sendMessage(
JSON.stringify(InitialSync.RECONNECT_ACK_MSG),
)
// if (payload !== InitialSync.RECONNECT_ACK_MSG) {
// throw new Error('Cannot re-establish connection')
// }
}

return resolve()
})
})
private async signalReconnectSender(
signalChannel: SignalChannel,
signals: SignalBuffer,
) {
let payload: string
// TODO: return if not found after certain time
do {
payload = await signals.getLatestOrWait()
} while (payload !== InitialSync.RECONNECT_REQ_MSG)

// if (payload !== InitialSync.RECONNECT_REQ_MSG) {
// throw new Error('Cannot re-establish connection')
// }

await signalChannel.sendMessage(
JSON.stringify(InitialSync.RECONNECT_ACK_MSG),
)
}

private handleStalledConnection = (options: {
role: FastSyncRole
signalChannel: SignalChannel
}) => async () => {
private handleStalledConnection = (
options: {
role: FastSyncRole
signalChannel: SignalChannel
},
signals: SignalBuffer,
) => async () => {
const initiator = options.role === 'receiver'
const peer = await this.getPeer({ initiator })

if (initiator) {
await this.signalReconnectReceiver(options.signalChannel)
await this.signalReconnectReceiver(options.signalChannel, signals)
} else {
await this.signalReconnectSender(options.signalChannel)
await this.signalReconnectSender(options.signalChannel, signals)
}

await this.setupSimplePeerSignalling({ ...options, peer })
Expand Down Expand Up @@ -400,9 +411,17 @@ export class InitialSync {
initiator: options.role === 'receiver',
})

const signals = new SignalBuffer()
options.signalChannel.events.on('signal', ({ payload }) =>
signals.bufferSignal({ payload }),
)

const channel: FastSyncChannel = new WebRTCFastSyncChannel({
peer,
reEstablishConnection: this.handleStalledConnection(options),
reEstablishConnection: this.handleStalledConnection(
options,
signals,
),
maxReconnectAttempts:
this.dependencies.maxReconnectAttempts ??
InitialSync.MAX_RECONNECT_ATTEMPTS,
Expand All @@ -420,3 +439,42 @@ export class InitialSync {
}
}
}

class SignalBuffer {
private queue: string[] = []
private currentlyWaiting = false
private shouldWait!: Promise<string>
private resolveWait!: (signal: string) => void

constructor() {
this.setupWait()
}

private setupWait() {
this.shouldWait = new Promise(resolve => {
this.resolveWait = resolve
})
}

bufferSignal(options: { payload: string }) {
if (this.currentlyWaiting) {
this.resolveWait(options.payload)
this.setupWait()
} else {
this.queue.push(options.payload)
}
}

async getLatestOrWait() {
if (this.queue.length) {
const [head, ...tail] = this.queue
this.queue = tail
return head
}

this.currentlyWaiting = true
const signal = await this.shouldWait
this.currentlyWaiting = false
return signal
}
}

0 comments on commit 5cf27f1

Please sign in to comment.