Skip to content

Commit

Permalink
feat: backpressure support for concat
Browse files Browse the repository at this point in the history
  • Loading branch information
ilteoood committed Dec 1, 2023
1 parent c7cf8e1 commit ce7a97b
Showing 1 changed file with 13 additions and 15 deletions.
28 changes: 13 additions & 15 deletions src/concat.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
export const concat = (...readableStreams: ReadableStream[]) => {
const fallbackedStreams = readableStreams ?? [];
const readableStreamsLength = fallbackedStreams.length;
let currentReaderIndex = 0;
let currentReader = fallbackedStreams[currentReaderIndex]?.getReader();

return new ReadableStream({
async start(controller) {
for (let i = 0; i < readableStreamsLength; i++) {
const reader = fallbackedStreams[i].getReader();

while (true) {
const readResult = await reader.read();
async pull(controller) {
if (!currentReader) {
controller.close();
return;
}

if (readResult.done) {
break;
}
const readResult = await currentReader.read();

controller.enqueue(readResult.value);
}
if (readResult.done) {
currentReader = fallbackedStreams[++currentReaderIndex]?.getReader();
} else {
controller.enqueue(readResult.value);
}

controller.close();
},
}
});
};

0 comments on commit ce7a97b

Please sign in to comment.