From ce7a97b338a93fe649678cb83e48b8cb1c8c0930 Mon Sep 17 00:00:00 2001 From: Matteo Pietro Dazzi Date: Fri, 1 Dec 2023 23:39:27 +0100 Subject: [PATCH] feat: backpressure support for concat --- src/concat.ts | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/src/concat.ts b/src/concat.ts index 21f961e..f9dad5b 100644 --- a/src/concat.ts +++ b/src/concat.ts @@ -1,24 +1,22 @@ export const concat = (...readableStreams: ReadableStream[]) => { const fallbackedStreams = readableStreams ?? []; - const readableStreamsLength = fallbackedStreams.length; + let currentReaderIndex = 0; + let currentReader = fallbackedStreams[currentReaderIndex]?.getReader(); return new ReadableStream({ - async start(controller) { - for (let i = 0; i < readableStreamsLength; i++) { - const reader = fallbackedStreams[i].getReader(); - - while (true) { - const readResult = await reader.read(); + async pull(controller) { + if (!currentReader) { + controller.close(); + return; + } - if (readResult.done) { - break; - } + const readResult = await currentReader.read(); - controller.enqueue(readResult.value); - } + if (readResult.done) { + currentReader = fallbackedStreams[++currentReaderIndex]?.getReader(); + } else { + controller.enqueue(readResult.value); } - - controller.close(); - }, + } }); };