From f4e21845d97d65f669fb858a4ae79961d6aab1ae Mon Sep 17 00:00:00 2001 From: KATT Date: Fri, 20 Oct 2023 14:59:38 +0200 Subject: [PATCH 01/13] wip --- src/async/sse.test.ts | 74 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 71 insertions(+), 3 deletions(-) diff --git a/src/async/sse.test.ts b/src/async/sse.test.ts index e1228d4..f6c3a4d 100644 --- a/src/async/sse.test.ts +++ b/src/async/sse.test.ts @@ -18,10 +18,7 @@ test("SSE response test", async () => { } return { - foo: "bar", iterable: generator(), - promise: Promise.resolve(42), - rejectedPromise: Promise.reject(new Error("rejected promise")), }; } @@ -110,3 +107,74 @@ test("SSE response test", async () => { `); } }); + +test("SSE handle reconnects", async () => { + let i = 0; + let kill = false; + function createMockObj() { + async function* generator() { + while (true) { + yield i++; + await sleep(10); + + if (i === 5) { + kill = true; + } + } + } + + return { + iterable: generator(), + }; + } + + type MockObj = ReturnType; + + // ------------- server ------------------- + const opts = { + nonce: () => "__tson", + types: [tsonPromise, tsonAsyncIterable], + } satisfies TsonAsyncOptions; + + const server = await createTestServer({ + handleRequest: async (_req, res) => { + const tson = createTsonAsync(opts); + + const obj = createMockObj(); + const response = tson.toSSEResponse(obj); + + for (const [key, value] of response.headers) { + res.setHeader(key, value); + } + + for await (const value of response.body as any) { + res.write(value); + if (kill) { + // interrupt the stream + res.end(); + kill = false; + return; + } + } + + res.end(); + }, + }); + + // ------------- client ------------------- + const tson = createTsonAsync(opts); + + // e2e + const ac = new AbortController(); + const shape = await tson.createEventSource(server.url, { + signal: ac.signal, + }); + + const messages: number[] = []; + + for await (const value of shape.iterable) { + messages.push(value); + } + + expect(messages.length).toMatchInlineSnapshot(); +}); From 4dfbca010ff72cba81cf16ba2c6f32eae27306c5 Mon Sep 17 00:00:00 2001 From: KATT Date: Fri, 20 Oct 2023 15:01:56 +0200 Subject: [PATCH 02/13] fix --- src/async/sse.test.ts | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/async/sse.test.ts b/src/async/sse.test.ts index f6c3a4d..77000bb 100644 --- a/src/async/sse.test.ts +++ b/src/async/sse.test.ts @@ -70,14 +70,14 @@ test("SSE response test", async () => { }); expect(messages).toMatchInlineSnapshot(` - [ - "{\\"json\\":{\\"foo\\":\\"bar\\",\\"iterable\\":[\\"AsyncIterable\\",0,\\"__tson\\"],\\"promise\\":[\\"Promise\\",1,\\"__tson\\"],\\"rejectedPromise\\":[\\"Promise\\",2,\\"__tson\\"]},\\"nonce\\":\\"__tson\\"}", - "[0,[0,0]]", - "[1,[0,42]]", - "[2,[1,{}]]", - "[0,[0,1]]", - ] - `); + [ + "{\\"json\\":{\\"iterable\\":[\\"AsyncIterable\\",0,\\"__tson\\"]},\\"nonce\\":\\"__tson\\"}", + "[0,[0,0]]", + "[0,[0,1]]", + "[0,[0,2]]", + "[0,[0,3]]", + ] + `); } { @@ -108,7 +108,7 @@ test("SSE response test", async () => { } }); -test("SSE handle reconnects", async () => { +test.only("handle reconnects when server dies", async () => { let i = 0; let kill = false; function createMockObj() { From da5975f3c025a1f36723bc9f1b221ba88fa22f0b Mon Sep 17 00:00:00 2001 From: KATT Date: Fri, 20 Oct 2023 15:34:12 +0200 Subject: [PATCH 03/13] reconn --- src/async/deserializeAsync.ts | 18 +++++++++++++++ src/async/sse.test.ts | 42 ++++++++++++++++++++++++++++------- vitest.config.ts | 5 ++++- 3 files changed, 56 insertions(+), 9 deletions(-) diff --git a/src/async/deserializeAsync.ts b/src/async/deserializeAsync.ts index 61d8ad9..c1089b0 100644 --- a/src/async/deserializeAsync.ts +++ b/src/async/deserializeAsync.ts @@ -35,6 +35,11 @@ export interface TsonParseAsyncOptions { * On stream error */ onStreamError?: (err: TsonStreamInterruptedError) => void; + /** + * Allow reconnecting to the stream if it's interrupted + * @default false + */ + reconnect?: boolean; } type TsonParseAsync = ( @@ -117,6 +122,19 @@ function createTsonDeserializer(opts: TsonAsyncOptions) { const { value } = nextValue; + if (!Array.isArray(value)) { + // we got the beginning of a new stream - probably because a reconnect + // we assume this new stream will have the same shape and restart the walker with the nonce + if (!parseOptions.reconnect) { + throw new TsonStreamInterruptedError( + "Stream interrupted and reconnecting is not allowed", + ); + } + + await getStreamedValues(walker(value.nonce)); + return; + } + const [index, result] = value as TsonAsyncValueTuple; const controller = cache.get(index); diff --git a/src/async/sse.test.ts b/src/async/sse.test.ts index 77000bb..20c37a8 100644 --- a/src/async/sse.test.ts +++ b/src/async/sse.test.ts @@ -3,11 +3,16 @@ import { EventSourcePolyfill, NativeEventSource } from "event-source-polyfill"; import { expect, test } from "vitest"; (global as any).EventSource = NativeEventSource || EventSourcePolyfill; -import { TsonAsyncOptions, tsonAsyncIterable, tsonPromise } from "../index.js"; +import { + TsonAsyncOptions, + tsonAsyncIterable, + tsonBigint, + tsonPromise, +} from "../index.js"; import { createTestServer, sleep } from "../internals/testUtils.js"; import { createTsonAsync } from "./createTsonAsync.js"; -test("SSE response test", async () => { +test.only("SSE response test", async () => { function createMockObj() { async function* generator() { let i = 0; @@ -108,18 +113,25 @@ test("SSE response test", async () => { } }); -test.only("handle reconnects when server dies", async () => { +test("handle reconnects when response is interrupted", async () => { let i = 0; + let kill = false; function createMockObj() { async function* generator() { while (true) { - yield i++; + yield BigInt(i); + i++; await sleep(10); if (i === 5) { kill = true; } + + if (i > 10) { + // done + return; + } } } @@ -132,8 +144,8 @@ test.only("handle reconnects when server dies", async () => { // ------------- server ------------------- const opts = { - nonce: () => "__tson", - types: [tsonPromise, tsonAsyncIterable], + nonce: () => "__tson" + i, // add index to nonce to make sure it's not cached + types: [tsonPromise, tsonAsyncIterable, tsonBigint], } satisfies TsonAsyncOptions; const server = await createTestServer({ @@ -167,14 +179,28 @@ test.only("handle reconnects when server dies", async () => { // e2e const ac = new AbortController(); const shape = await tson.createEventSource(server.url, { + reconnect: true, signal: ac.signal, }); - const messages: number[] = []; + const messages: bigint[] = []; for await (const value of shape.iterable) { messages.push(value); } - expect(messages.length).toMatchInlineSnapshot(); + expect(messages).toMatchInlineSnapshot(` + [ + 0n, + 1n, + 2n, + 3n, + 4n, + 5n, + 7n, + 8n, + 9n, + 10n, + ] + `); }); diff --git a/vitest.config.ts b/vitest.config.ts index 27b62ac..77279c4 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -10,6 +10,9 @@ export default defineConfig({ reporter: ["html", "lcov"], }, exclude: ["lib", "node_modules", "examples", "benchmark"], - setupFiles: ["console-fail-test/setup"], + setupFiles: [ + // this is useful to comment out sometimes + "console-fail-test/setup", + ], }, }); From d364cc49f24600a9ab0ddc96e64bf95e445877de Mon Sep 17 00:00:00 2001 From: KATT Date: Fri, 20 Oct 2023 15:34:27 +0200 Subject: [PATCH 04/13] huh --- src/async/sse.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/async/sse.test.ts b/src/async/sse.test.ts index 20c37a8..3d497eb 100644 --- a/src/async/sse.test.ts +++ b/src/async/sse.test.ts @@ -12,7 +12,7 @@ import { import { createTestServer, sleep } from "../internals/testUtils.js"; import { createTsonAsync } from "./createTsonAsync.js"; -test.only("SSE response test", async () => { +test("SSE response test", async () => { function createMockObj() { async function* generator() { let i = 0; From 4a78c3e8b25ee908c36a4db8e55a8883c682713d Mon Sep 17 00:00:00 2001 From: KATT Date: Fri, 20 Oct 2023 15:40:56 +0200 Subject: [PATCH 05/13] trickier scenario --- src/async/sse.test.ts | 92 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/src/async/sse.test.ts b/src/async/sse.test.ts index 3d497eb..c4ff667 100644 --- a/src/async/sse.test.ts +++ b/src/async/sse.test.ts @@ -204,3 +204,95 @@ test("handle reconnects when response is interrupted", async () => { ] `); }); + +test("handle reconnects - iterator wrapped in Promise", async () => { + let i = 0; + + let kill = false; + function createMockObj() { + async function* generator() { + while (true) { + yield BigInt(i); + i++; + await sleep(10); + + if (i === 5) { + kill = true; + } + + if (i > 10) { + // done + return; + } + } + } + + return { + iterable: Promise.resolve(generator()), + }; + } + + type MockObj = ReturnType; + + // ------------- server ------------------- + const opts = { + nonce: () => "__tson" + i, // add index to nonce to make sure it's not cached + types: [tsonPromise, tsonAsyncIterable, tsonBigint], + } satisfies TsonAsyncOptions; + + const server = await createTestServer({ + handleRequest: async (_req, res) => { + const tson = createTsonAsync(opts); + + const obj = createMockObj(); + const response = tson.toSSEResponse(obj); + + for (const [key, value] of response.headers) { + res.setHeader(key, value); + } + + for await (const value of response.body as any) { + res.write(value); + if (kill) { + // interrupt the stream + res.end(); + kill = false; + return; + } + } + + res.end(); + }, + }); + + // ------------- client ------------------- + const tson = createTsonAsync(opts); + + // e2e + const ac = new AbortController(); + const shape = await tson.createEventSource(server.url, { + reconnect: true, + signal: ac.signal, + }); + + const messages: bigint[] = []; + + for await (const value of await shape.iterable) { + messages.push(value); + } + + expect(messages).toMatchInlineSnapshot(` + [ + 0n, + 1n, + 2n, + 3n, + 4n, + 5n, + 7n, + 8n, + 9n, + 10n, + ] + `); +}); From 30d27fb11e85d5e4748de7c75d43be64f07c2281 Mon Sep 17 00:00:00 2001 From: KATT Date: Fri, 20 Oct 2023 15:50:39 +0200 Subject: [PATCH 06/13] cool --- src/async/deserializeAsync.ts | 31 +++++++++++++++++++++++-------- src/async/sse.test.ts | 2 +- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/src/async/deserializeAsync.ts b/src/async/deserializeAsync.ts index c1089b0..80b3b89 100644 --- a/src/async/deserializeAsync.ts +++ b/src/async/deserializeAsync.ts @@ -1,3 +1,4 @@ +/* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/no-non-null-assertion */ import { TsonError } from "../errors.js"; @@ -67,10 +68,11 @@ function createTsonDeserializer(opts: TsonAsyncOptions) { iterable: TsonDeserializeIterable, parseOptions: TsonParseAsyncOptions, ) => { - const cache = new Map< + const controllers = new Map< TsonAsyncIndex, ReadableStreamDefaultController >(); + const cache = new Map(); const iterator = iterable[Symbol.asyncIterator](); const walker: WalkerFactory = (nonce) => { @@ -88,6 +90,14 @@ function createTsonDeserializer(opts: TsonAsyncOptions) { const idx = serializedValue as TsonAsyncIndex; + if (cache.has(idx)) { + assert( + parseOptions.reconnect, + "Duplicate index found but reconnect is off", + ); + return cache.get(idx); + } + const [readable, controller] = createReadableStream(); // the `start` method is called "immediately when the object is constructed" @@ -95,15 +105,18 @@ function createTsonDeserializer(opts: TsonAsyncOptions) { // so we're guaranteed that the controller is set in the cache assert(controller, "Controller not set - this is a bug"); - cache.set(idx, controller); + controllers.set(idx, controller); - return transformer.deserialize({ + const result = transformer.deserialize({ close() { controller.close(); - cache.delete(idx); + controllers.delete(idx); }, reader: readable.getReader(), }); + + cache.set(idx, result); + return result; } return mapOrReturn(value, walk); @@ -137,14 +150,16 @@ function createTsonDeserializer(opts: TsonAsyncOptions) { const [index, result] = value as TsonAsyncValueTuple; - const controller = cache.get(index); + const controller = controllers.get(index); const walkedResult = walk(result); - assert(controller, `No stream found for index ${index}`); + if (!parseOptions.reconnect) { + assert(controller, `No stream found for index ${index}`); + } // resolving deferred - controller.enqueue(walkedResult); + controller?.enqueue(walkedResult); } } @@ -170,7 +185,7 @@ function createTsonDeserializer(opts: TsonAsyncOptions) { const err = new TsonStreamInterruptedError(cause); // enqueue the error to all the streams - for (const controller of cache.values()) { + for (const controller of controllers.values()) { controller.enqueue(err); } diff --git a/src/async/sse.test.ts b/src/async/sse.test.ts index c4ff667..7d112cb 100644 --- a/src/async/sse.test.ts +++ b/src/async/sse.test.ts @@ -18,7 +18,7 @@ test("SSE response test", async () => { let i = 0; while (true) { yield i++; - await sleep(100); + await sleep(10); } } From baa7e8372efebac9ee4020151d8c90c378c7b174 Mon Sep 17 00:00:00 2001 From: KATT Date: Fri, 20 Oct 2023 15:51:32 +0200 Subject: [PATCH 07/13] cool --- src/async/deserializeAsync.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/async/deserializeAsync.ts b/src/async/deserializeAsync.ts index 80b3b89..7bffd8c 100644 --- a/src/async/deserializeAsync.ts +++ b/src/async/deserializeAsync.ts @@ -91,6 +91,7 @@ function createTsonDeserializer(opts: TsonAsyncOptions) { const idx = serializedValue as TsonAsyncIndex; if (cache.has(idx)) { + // We already have this async value in the cache - so this is probably a reconnect assert( parseOptions.reconnect, "Duplicate index found but reconnect is off", From 44e9a252c31d985d1407313d171d4fcda3bec2b3 Mon Sep 17 00:00:00 2001 From: KATT Date: Fri, 20 Oct 2023 15:58:56 +0200 Subject: [PATCH 08/13] cool --- src/async/deserializeAsync.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/async/deserializeAsync.ts b/src/async/deserializeAsync.ts index 7bffd8c..dd6ad92 100644 --- a/src/async/deserializeAsync.ts +++ b/src/async/deserializeAsync.ts @@ -141,7 +141,7 @@ function createTsonDeserializer(opts: TsonAsyncOptions) { // we assume this new stream will have the same shape and restart the walker with the nonce if (!parseOptions.reconnect) { throw new TsonStreamInterruptedError( - "Stream interrupted and reconnecting is not allowed", + "Stream got beginning of results but reconnecting is not enabled", ); } From 1fde57929e706fcdf2e6cb3dfdee349666eb167a Mon Sep 17 00:00:00 2001 From: KATT Date: Fri, 20 Oct 2023 16:04:23 +0200 Subject: [PATCH 09/13] add onReconnect --- src/async/deserializeAsync.ts | 6 ++++++ src/async/sse.test.ts | 16 ++++++++++------ 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/async/deserializeAsync.ts b/src/async/deserializeAsync.ts index dd6ad92..fdb1044 100644 --- a/src/async/deserializeAsync.ts +++ b/src/async/deserializeAsync.ts @@ -32,6 +32,11 @@ type AnyTsonTransformerSerializeDeserialize = | TsonTransformerSerializeDeserialize; export interface TsonParseAsyncOptions { + /** + * Event handler for when the stream reconnects + * You can use this to do extra actions to ensure no messages were lost + */ + onReconnect?: () => void; /** * On stream error */ @@ -145,6 +150,7 @@ function createTsonDeserializer(opts: TsonAsyncOptions) { ); } + parseOptions.onReconnect?.(); await getStreamedValues(walker(value.nonce)); return; } diff --git a/src/async/sse.test.ts b/src/async/sse.test.ts index 7d112cb..5311a08 100644 --- a/src/async/sse.test.ts +++ b/src/async/sse.test.ts @@ -1,6 +1,6 @@ /* eslint-disable @typescript-eslint/no-unnecessary-condition */ import { EventSourcePolyfill, NativeEventSource } from "event-source-polyfill"; -import { expect, test } from "vitest"; +import { expect, test, vi } from "vitest"; (global as any).EventSource = NativeEventSource || EventSourcePolyfill; import { @@ -212,15 +212,15 @@ test("handle reconnects - iterator wrapped in Promise", async () => { function createMockObj() { async function* generator() { while (true) { + await sleep(10); yield BigInt(i); i++; - await sleep(10); - if (i === 5) { + if (i % 5 === 0) { kill = true; } - if (i > 10) { + if (i > 11) { // done return; } @@ -270,7 +270,9 @@ test("handle reconnects - iterator wrapped in Promise", async () => { // e2e const ac = new AbortController(); + const onReconnect = vi.fn(); const shape = await tson.createEventSource(server.url, { + onReconnect, reconnect: true, signal: ac.signal, }); @@ -288,11 +290,13 @@ test("handle reconnects - iterator wrapped in Promise", async () => { 2n, 3n, 4n, - 5n, + 6n, 7n, 8n, 9n, - 10n, + 11n, ] `); + + expect(onReconnect).toHaveBeenCalledTimes(2); }); From 7b80bd43f7112cde2491bf6c5718c97add517495 Mon Sep 17 00:00:00 2001 From: KATT Date: Fri, 20 Oct 2023 16:05:38 +0200 Subject: [PATCH 10/13] assert --- src/async/deserializeAsync.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/async/deserializeAsync.ts b/src/async/deserializeAsync.ts index fdb1044..f89de5f 100644 --- a/src/async/deserializeAsync.ts +++ b/src/async/deserializeAsync.ts @@ -144,11 +144,11 @@ function createTsonDeserializer(opts: TsonAsyncOptions) { if (!Array.isArray(value)) { // we got the beginning of a new stream - probably because a reconnect // we assume this new stream will have the same shape and restart the walker with the nonce - if (!parseOptions.reconnect) { - throw new TsonStreamInterruptedError( - "Stream got beginning of results but reconnecting is not enabled", - ); - } + + assert( + parseOptions.reconnect, + "Stream got beginning of results but reconnecting is not enabled", + ); parseOptions.onReconnect?.(); await getStreamedValues(walker(value.nonce)); From 9bbd73309cd4853f9bdec2a48bcbf372969a3714 Mon Sep 17 00:00:00 2001 From: KATT Date: Fri, 20 Oct 2023 16:06:10 +0200 Subject: [PATCH 11/13] coolz --- src/async/deserializeAsync.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/async/deserializeAsync.ts b/src/async/deserializeAsync.ts index f89de5f..f7cdfac 100644 --- a/src/async/deserializeAsync.ts +++ b/src/async/deserializeAsync.ts @@ -145,12 +145,13 @@ function createTsonDeserializer(opts: TsonAsyncOptions) { // we got the beginning of a new stream - probably because a reconnect // we assume this new stream will have the same shape and restart the walker with the nonce + parseOptions.onReconnect?.(); + assert( parseOptions.reconnect, "Stream got beginning of results but reconnecting is not enabled", ); - parseOptions.onReconnect?.(); await getStreamedValues(walker(value.nonce)); return; } From 4126f82e60c60f82ee5b2b12522a4753d237270e Mon Sep 17 00:00:00 2001 From: KATT Date: Fri, 20 Oct 2023 16:19:21 +0200 Subject: [PATCH 12/13] this test adds nothing --- src/async/sse.test.ts | 92 ------------------------------------------- 1 file changed, 92 deletions(-) diff --git a/src/async/sse.test.ts b/src/async/sse.test.ts index 5311a08..24a21ef 100644 --- a/src/async/sse.test.ts +++ b/src/async/sse.test.ts @@ -113,98 +113,6 @@ test("SSE response test", async () => { } }); -test("handle reconnects when response is interrupted", async () => { - let i = 0; - - let kill = false; - function createMockObj() { - async function* generator() { - while (true) { - yield BigInt(i); - i++; - await sleep(10); - - if (i === 5) { - kill = true; - } - - if (i > 10) { - // done - return; - } - } - } - - return { - iterable: generator(), - }; - } - - type MockObj = ReturnType; - - // ------------- server ------------------- - const opts = { - nonce: () => "__tson" + i, // add index to nonce to make sure it's not cached - types: [tsonPromise, tsonAsyncIterable, tsonBigint], - } satisfies TsonAsyncOptions; - - const server = await createTestServer({ - handleRequest: async (_req, res) => { - const tson = createTsonAsync(opts); - - const obj = createMockObj(); - const response = tson.toSSEResponse(obj); - - for (const [key, value] of response.headers) { - res.setHeader(key, value); - } - - for await (const value of response.body as any) { - res.write(value); - if (kill) { - // interrupt the stream - res.end(); - kill = false; - return; - } - } - - res.end(); - }, - }); - - // ------------- client ------------------- - const tson = createTsonAsync(opts); - - // e2e - const ac = new AbortController(); - const shape = await tson.createEventSource(server.url, { - reconnect: true, - signal: ac.signal, - }); - - const messages: bigint[] = []; - - for await (const value of shape.iterable) { - messages.push(value); - } - - expect(messages).toMatchInlineSnapshot(` - [ - 0n, - 1n, - 2n, - 3n, - 4n, - 5n, - 7n, - 8n, - 9n, - 10n, - ] - `); -}); - test("handle reconnects - iterator wrapped in Promise", async () => { let i = 0; From 99793f52cb9d8f940695ec5d5dd4d559ccbcacc4 Mon Sep 17 00:00:00 2001 From: KATT Date: Fri, 20 Oct 2023 16:20:44 +0200 Subject: [PATCH 13/13] reconnect in example --- examples/async/src/app/sse-infinite/StreamedTimeSSE.tsx | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/async/src/app/sse-infinite/StreamedTimeSSE.tsx b/examples/async/src/app/sse-infinite/StreamedTimeSSE.tsx index 3660860..c153642 100644 --- a/examples/async/src/app/sse-infinite/StreamedTimeSSE.tsx +++ b/examples/async/src/app/sse-infinite/StreamedTimeSSE.tsx @@ -11,6 +11,7 @@ export function StreamedTimeSSE() { useEffect(() => { const abortSignal = new AbortController(); createEventSource("/sse-infinite", { + reconnect: true, signal: abortSignal.signal, }) .then(async (shape) => {