From daec32bfb861412dd8700d9e9d050b55b57bd1a9 Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Mon, 30 Sep 2024 15:00:05 +0200 Subject: [PATCH 01/17] feat: support server-sent events --- src/browser/index.ts | 7 ++ src/browser/sse.ts | 111 +++++++++++++++++++ test/browser/sse-api/sse.mocks.ts | 7 ++ test/browser/sse-api/sse.node.test.ts | 149 ++++++++++++++++++++++++++ 4 files changed, 274 insertions(+) create mode 100644 src/browser/sse.ts create mode 100644 test/browser/sse-api/sse.mocks.ts create mode 100644 test/browser/sse-api/sse.node.test.ts diff --git a/src/browser/index.ts b/src/browser/index.ts index 0eafbe76f..ba272ddb5 100644 --- a/src/browser/index.ts +++ b/src/browser/index.ts @@ -1,3 +1,10 @@ export { setupWorker } from './setupWorker/setupWorker' export type { SetupWorker, StartOptions } from './setupWorker/glossary' export { SetupWorkerApi } from './setupWorker/setupWorker' + +/* Server-Sent Events */ +export { + sse, + type ServerSentEventRequestHandler, + type ServerSentEventResolver, +} from './sse' diff --git a/src/browser/sse.ts b/src/browser/sse.ts new file mode 100644 index 000000000..a3033902f --- /dev/null +++ b/src/browser/sse.ts @@ -0,0 +1,111 @@ +import type { ResponseResolver } from '~/core/handlers/RequestHandler' +import { + HttpHandler, + type HttpRequestResolverExtras, + type HttpRequestParsedResult, +} from '~/core/handlers/HttpHandler' +import { HttpResponse } from '~/core/HttpResponse' +import type { Path, PathParams } from '~/core/utils/matching/matchRequestUrl' + +export type ServerSentEventResolverExtras = + HttpRequestResolverExtras & { + source: ServerSentEventSource + } + +export type ServerSentEventResolver = + ResponseResolver, any, any> + +export type ServerSentEventRequestHandler = < + Params extends PathParams = PathParams, + RequestPath extends Path = Path, +>( + path: RequestPath, + resolver: ServerSentEventResolver, +) => HttpHandler + +/** + * Request handler for Server-Sent Events (SSE). + * + * @example + * sse('http://localhost:4321', ({ source }) => { + * source.send({ data: 'hello world' }) + * }) + * + * @see {@link https://mswjs.io/docs/api/sse `sse()` API reference} + */ +export const sse: ServerSentEventRequestHandler = (path, resolver) => { + return new ServerSentEventHandler(path, resolver) +} + +class ServerSentEventHandler extends HttpHandler { + constructor(path: Path, resolver: ServerSentEventResolver) { + super('GET', path, (info) => { + const stream = new ReadableStream({ + start(controller) { + resolver({ + ...info, + source: new ServerSentEventSource(controller), + }) + }, + }) + + return new HttpResponse(stream, { + headers: { + 'content-type': 'text/event-stream', + 'cache-control': 'no-cache', + connection: 'keep-alive', + }, + }) + }) + } + + predicate(args: { + request: Request + parsedResult: HttpRequestParsedResult + }): boolean { + // Ignore non-SSE requests. + if ( + args.request.headers.get('accept')?.toLowerCase() !== 'text/event-stream' + ) { + return false + } + + // If it is a SSE request, match it against its method and path. + return super.predicate(args) + } +} + +class ServerSentEventSource { + private encoder: TextEncoder + + constructor(protected controller: ReadableStreamDefaultController) { + this.encoder = new TextEncoder() + } + + /** + * Dispatches the given event to the underlying `EventSource`. + */ + public send(payload: { id?: string; event?: string; data: unknown }): void { + const frames: Array = [] + + if (payload.event) { + frames.push(`event:${payload.event}`) + } + + frames.push(`data:${JSON.stringify(payload.data)}`) + + if (payload.id) { + frames.push(`id:${payload.id}`) + } + + frames.push('', '') + this.controller.enqueue(this.encoder.encode(frames.join('\n'))) + } + + /** + * Errors the underlying `EventSource`, closing the connection. + */ + public error(): void { + this.controller.error() + } +} diff --git a/test/browser/sse-api/sse.mocks.ts b/test/browser/sse-api/sse.mocks.ts new file mode 100644 index 000000000..29525730c --- /dev/null +++ b/test/browser/sse-api/sse.mocks.ts @@ -0,0 +1,7 @@ +import { setupWorker, sse } from 'msw/browser' + +// @ts-ignore +window.msw = { + setupWorker, + sse, +} diff --git a/test/browser/sse-api/sse.node.test.ts b/test/browser/sse-api/sse.node.test.ts new file mode 100644 index 000000000..9e346411b --- /dev/null +++ b/test/browser/sse-api/sse.node.test.ts @@ -0,0 +1,149 @@ +import { setupWorker, sse } from 'msw/browser' +import { test, expect } from '../playwright.extend' + +declare namespace window { + export const msw: { + setupWorker: typeof setupWorker + sse: typeof sse + } +} + +test('sends a mock message event', async ({ loadExample, page }) => { + await loadExample(require.resolve('./sse.mocks.ts'), { + skipActivation: true, + }) + + await page.evaluate(async () => { + const { setupWorker, sse } = window.msw + + const worker = setupWorker( + sse('http://localhost/stream', ({ source }) => { + source.send({ + data: { username: 'john' }, + }) + }), + ) + await worker.start() + }) + + const message = await page.evaluate(() => { + return new Promise((resolve, reject) => { + const source = new EventSource('http://localhost/stream') + source.addEventListener('message', (event) => { + resolve(`${event.type}:${event.data}`) + }) + source.onerror = reject + }) + }) + + expect(message).toBe('message:{"username":"john"}') +}) + +test('sends a mock custom event', async ({ loadExample, page }) => { + await loadExample(require.resolve('./sse.mocks.ts'), { + skipActivation: true, + }) + + await page.evaluate(async () => { + const { setupWorker, sse } = window.msw + + const worker = setupWorker( + sse('http://localhost/stream', ({ source }) => { + source.send({ + event: 'userconnect', + data: { username: 'john' }, + }) + }), + ) + await worker.start() + }) + + const message = await page.evaluate(() => { + return new Promise((resolve, reject) => { + const source = new EventSource('http://localhost/stream') + source.addEventListener('userconnect', (event) => { + resolve(`${event.type}:${event.data}`) + }) + source.onerror = reject + }) + }) + + expect(message).toEqual('userconnect:{"username":"john"}') +}) + +test('sends a mock message event with custom id', async ({ + loadExample, + page, +}) => { + await loadExample(require.resolve('./sse.mocks.ts'), { + skipActivation: true, + }) + + await page.evaluate(async () => { + const { setupWorker, sse } = window.msw + + const worker = setupWorker( + sse('http://localhost/stream', ({ source }) => { + source.send({ + id: 'abc-123', + event: 'userconnect', + data: { username: 'john' }, + }) + }), + ) + await worker.start() + }) + + const message = await page.evaluate(() => { + return new Promise((resolve, reject) => { + const source = new EventSource('http://localhost/stream') + source.addEventListener('userconnect', (event) => { + console.log(event) + resolve(`${event.type}:${event.lastEventId}:${event.data}`) + }) + source.onerror = reject + }) + }) + + expect(message).toBe('userconnect:abc-123:{"username":"john"}') +}) + +test('errors the connected source', async ({ loadExample, page, waitFor }) => { + await loadExample(require.resolve('./sse.mocks.ts'), { + skipActivation: true, + }) + + const pageErrors: Array = [] + page.on('pageerror', (error) => { + pageErrors.push(error.message) + }) + + await page.evaluate(async () => { + const { setupWorker, sse } = window.msw + + const worker = setupWorker( + sse('http://localhost/stream', ({ source }) => { + queueMicrotask(() => source.error()) + }), + ) + await worker.start() + }) + + const readyState = await page.evaluate(() => { + return new Promise((resolve) => { + const source = new EventSource('http://localhost/stream') + source.addEventListener('error', (event) => { + console.log('ERROR!', event, source.readyState) + resolve(source.readyState) + }) + }) + }) + + // EventSource must be closed. + expect(readyState).toBe(2) + + await waitFor(() => { + // Must error with "Failed to fetch" (default EventSource behavior). + expect(pageErrors).toContain('Failed to fetch') + }) +}) From 8c37fe0dda72ac9e4e9a4505444a8505a5942659 Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Mon, 30 Sep 2024 22:56:53 +0200 Subject: [PATCH 02/17] fix: support passthrough via `server` --- src/browser/sse.ts | 123 ++++++++++++++++++++++++-- src/core/utils/handleRequest.ts | 5 +- test/browser/sse-api/sse.node.test.ts | 71 +++++++++++++-- 3 files changed, 184 insertions(+), 15 deletions(-) diff --git a/src/browser/sse.ts b/src/browser/sse.ts index a3033902f..b80ff9afc 100644 --- a/src/browser/sse.ts +++ b/src/browser/sse.ts @@ -1,15 +1,16 @@ +import type { Constructor } from 'type-fest' import type { ResponseResolver } from '~/core/handlers/RequestHandler' import { HttpHandler, type HttpRequestResolverExtras, type HttpRequestParsedResult, } from '~/core/handlers/HttpHandler' -import { HttpResponse } from '~/core/HttpResponse' import type { Path, PathParams } from '~/core/utils/matching/matchRequestUrl' export type ServerSentEventResolverExtras = HttpRequestResolverExtras & { - source: ServerSentEventSource + client: ServerSentEventClient + server: ServerSentEventServer } export type ServerSentEventResolver = @@ -42,14 +43,23 @@ class ServerSentEventHandler extends HttpHandler { super('GET', path, (info) => { const stream = new ReadableStream({ start(controller) { + const client = new ServerSentEventClient({ + controller, + }) + const server = new ServerSentEventServer({ + request: info.request, + client, + }) + resolver({ ...info, - source: new ServerSentEventSource(controller), + client, + server, }) }, }) - return new HttpResponse(stream, { + return new Response(stream, { headers: { 'content-type': 'text/event-stream', 'cache-control': 'no-cache', @@ -75,11 +85,13 @@ class ServerSentEventHandler extends HttpHandler { } } -class ServerSentEventSource { +class ServerSentEventClient { private encoder: TextEncoder + protected controller: ReadableStreamDefaultController - constructor(protected controller: ReadableStreamDefaultController) { + constructor(args: { controller: ReadableStreamDefaultController }) { this.encoder = new TextEncoder() + this.controller = args.controller } /** @@ -109,3 +121,102 @@ class ServerSentEventSource { this.controller.error() } } + +const kListener = Symbol('kListener') + +class ServerSentEventServer { + protected request: Request + protected client: ServerSentEventClient + + constructor(args: { request: Request; client: ServerSentEventClient }) { + this.request = args.request + this.client = args.client + } + + /** + * Establishes an actual connection for this SSE request + * and returns the `EventSource` instance. + */ + public connect(): EventSource { + const requestUrl = new URL(this.request.url) + requestUrl.searchParams.set('x-msw-intention', 'bypass') + + let source = new EventSource(requestUrl, { + withCredentials: this.request.credentials === 'include', + }) + + const addEventListener = source.addEventListener.bind(source) + source.addEventListener = (event: any, listener: any, options: any) => { + const wrappedListener = this.wrapEventListener(listener).bind(source) + Object.defineProperty(listener, kListener, { + value: wrappedListener, + }) + addEventListener(event, wrappedListener, options) + } + + const removeEventListener = source.removeEventListener.bind(source) + source.removeEventListener = (event: any, listener: any, options: any) => { + const wrappedListener = listener[kListener] || listener + removeEventListener(event, wrappedListener, options) + } + + source = new Proxy(source, { + set: (target, property, value) => { + switch (property) { + case 'onopen': + case 'onmessage': + case 'onerror': { + return Reflect.set(target, property, this.wrapEventListener(value)) + } + } + + return Reflect.set(target, property, value) + }, + }) + + return source + } + + private wrapEventListener(listener: (event: Event) => void) { + const { client } = this + + return function (this: EventSource, event: Event) { + const EventConstructor = event.constructor as Constructor + const cancelableEvent = new EventConstructor(event.type, { + ...event, + cancelable: true, + }) + + listener.call(this, cancelableEvent) + + if (event.type === 'open') { + return + } + + if (!cancelableEvent.defaultPrevented) { + switch (event.type) { + case 'error': { + client.error() + break + } + + default: { + if (event instanceof MessageEvent) { + client.send({ + id: event.lastEventId, + event: event.type === 'message' ? undefined : event.type, + /** + * @fixme Data will already be stringified. + * `client.send()` will stringify it again. + */ + data: event.data, + }) + } + + break + } + } + } + } + } +} diff --git a/src/core/utils/handleRequest.ts b/src/core/utils/handleRequest.ts index 766b22ce6..ab2fcb88b 100644 --- a/src/core/utils/handleRequest.ts +++ b/src/core/utils/handleRequest.ts @@ -53,7 +53,10 @@ export async function handleRequest( emitter.emit('request:start', { request, requestId }) // Perform bypassed requests (i.e. wrapped in "bypass()") as-is. - if (request.headers.get('x-msw-intention') === 'bypass') { + if ( + request.headers.get('x-msw-intention') === 'bypass' || + new URL(request.url).searchParams.get('x-msw-intention') === 'bypass' + ) { emitter.emit('request:end', { request, requestId }) handleRequestOptions?.onPassthroughResponse?.(request) return diff --git a/test/browser/sse-api/sse.node.test.ts b/test/browser/sse-api/sse.node.test.ts index 9e346411b..a4607f0f3 100644 --- a/test/browser/sse-api/sse.node.test.ts +++ b/test/browser/sse-api/sse.node.test.ts @@ -1,4 +1,5 @@ import { setupWorker, sse } from 'msw/browser' +import { HttpServer } from '@open-draft/test-server/http' import { test, expect } from '../playwright.extend' declare namespace window { @@ -8,6 +9,26 @@ declare namespace window { } } +const httpServer = new HttpServer((app) => { + app.get('/stream', (req, res) => { + res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }) + + res.write('data: {"message": "hello"}\n\n') + }) +}) + +test.beforeAll(async () => { + await httpServer.listen() +}) + +test.afterAll(async () => { + await httpServer.close() +}) + test('sends a mock message event', async ({ loadExample, page }) => { await loadExample(require.resolve('./sse.mocks.ts'), { skipActivation: true, @@ -17,8 +38,8 @@ test('sends a mock message event', async ({ loadExample, page }) => { const { setupWorker, sse } = window.msw const worker = setupWorker( - sse('http://localhost/stream', ({ source }) => { - source.send({ + sse('http://localhost/stream', ({ client }) => { + client.send({ data: { username: 'john' }, }) }), @@ -48,8 +69,8 @@ test('sends a mock custom event', async ({ loadExample, page }) => { const { setupWorker, sse } = window.msw const worker = setupWorker( - sse('http://localhost/stream', ({ source }) => { - source.send({ + sse('http://localhost/stream', ({ client }) => { + client.send({ event: 'userconnect', data: { username: 'john' }, }) @@ -83,8 +104,8 @@ test('sends a mock message event with custom id', async ({ const { setupWorker, sse } = window.msw const worker = setupWorker( - sse('http://localhost/stream', ({ source }) => { - source.send({ + sse('http://localhost/stream', ({ client }) => { + client.send({ id: 'abc-123', event: 'userconnect', data: { username: 'john' }, @@ -122,8 +143,8 @@ test('errors the connected source', async ({ loadExample, page, waitFor }) => { const { setupWorker, sse } = window.msw const worker = setupWorker( - sse('http://localhost/stream', ({ source }) => { - queueMicrotask(() => source.error()) + sse('http://localhost/stream', ({ client }) => { + queueMicrotask(() => client.error()) }), ) await worker.start() @@ -147,3 +168,37 @@ test('errors the connected source', async ({ loadExample, page, waitFor }) => { expect(pageErrors).toContain('Failed to fetch') }) }) + +test.only('forwards original server message events to the client', async ({ + loadExample, + page, +}) => { + await loadExample(require.resolve('./sse.mocks.ts'), { + skipActivation: true, + }) + const url = httpServer.http.url('/stream') + + await page.evaluate(async (url) => { + const { setupWorker, sse } = window.msw + + const worker = setupWorker( + sse(url, async ({ server }) => { + const source = server.connect() + + source.addEventListener('message', (event) => { + event.preventDefault() + }) + }), + ) + await worker.start() + }, url) + + await page.evaluate((url) => { + const source = new EventSource(url) + source.addEventListener('message', (event) => { + console.warn('client received:', event) + }) + }, url) + + await page.pause() +}) From 70756861036fad4bb23ee32d31a74279ad1485be Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Mon, 30 Sep 2024 23:09:13 +0200 Subject: [PATCH 03/17] fix: prevent double data serialization --- src/browser/sse.ts | 56 ++++++++++++++++++--------- test/browser/sse-api/sse.node.test.ts | 4 +- 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/src/browser/sse.ts b/src/browser/sse.ts index b80ff9afc..d76c89018 100644 --- a/src/browser/sse.ts +++ b/src/browser/sse.ts @@ -1,4 +1,3 @@ -import type { Constructor } from 'type-fest' import type { ResponseResolver } from '~/core/handlers/RequestHandler' import { HttpHandler, @@ -85,6 +84,8 @@ class ServerSentEventHandler extends HttpHandler { } } +const kSend = Symbol('kSend') + class ServerSentEventClient { private encoder: TextEncoder protected controller: ReadableStreamDefaultController @@ -98,13 +99,32 @@ class ServerSentEventClient { * Dispatches the given event to the underlying `EventSource`. */ public send(payload: { id?: string; event?: string; data: unknown }): void { + this[kSend]({ + id: payload.id, + event: payload.event, + data: JSON.stringify(payload.data), + }) + } + + /** + * Errors the underlying `EventSource`, closing the connection. + */ + public error(): void { + this.controller.error() + } + + private [kSend](payload: { + id?: string + event?: string + data: string + }): void { const frames: Array = [] if (payload.event) { frames.push(`event:${payload.event}`) } - frames.push(`data:${JSON.stringify(payload.data)}`) + frames.push(`data:${payload.data}`) if (payload.id) { frames.push(`id:${payload.id}`) @@ -113,13 +133,6 @@ class ServerSentEventClient { frames.push('', '') this.controller.enqueue(this.encoder.encode(frames.join('\n'))) } - - /** - * Errors the underlying `EventSource`, closing the connection. - */ - public error(): void { - this.controller.error() - } } const kListener = Symbol('kListener') @@ -181,14 +194,24 @@ class ServerSentEventServer { const { client } = this return function (this: EventSource, event: Event) { - const EventConstructor = event.constructor as Constructor - const cancelableEvent = new EventConstructor(event.type, { - ...event, - cancelable: true, + const cancelableEvent = + event instanceof MessageEvent + ? new MessageEvent(event.type, { + data: event.data, + cancelable: true, + }) + : new Event(event.type, { + cancelable: true, + }) + Object.defineProperties(cancelableEvent, { + target: { enumerable: true, value: this, writable: false }, + currentTarget: { enumerable: true, value: this, writable: false }, + srcElement: { enumerable: true, value: this, writable: false }, }) listener.call(this, cancelableEvent) + // The "open" event cannot be prevented so ignore it. if (event.type === 'open') { return } @@ -202,13 +225,10 @@ class ServerSentEventServer { default: { if (event instanceof MessageEvent) { - client.send({ + // Use the internal send method to avoid data serialization. + client[kSend]({ id: event.lastEventId, event: event.type === 'message' ? undefined : event.type, - /** - * @fixme Data will already be stringified. - * `client.send()` will stringify it again. - */ data: event.data, }) } diff --git a/test/browser/sse-api/sse.node.test.ts b/test/browser/sse-api/sse.node.test.ts index a4607f0f3..c7ed9fb64 100644 --- a/test/browser/sse-api/sse.node.test.ts +++ b/test/browser/sse-api/sse.node.test.ts @@ -169,7 +169,7 @@ test('errors the connected source', async ({ loadExample, page, waitFor }) => { }) }) -test.only('forwards original server message events to the client', async ({ +test('forwards original server message events to the client', async ({ loadExample, page, }) => { @@ -186,7 +186,7 @@ test.only('forwards original server message events to the client', async ({ const source = server.connect() source.addEventListener('message', (event) => { - event.preventDefault() + console.log(event) }) }), ) From 775c2d6918391cdc56571f2bb9a7411df7113d8c Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Mon, 30 Sep 2024 23:09:32 +0200 Subject: [PATCH 04/17] chore: rename sse test --- test/browser/sse-api/{sse.node.test.ts => sse.test.ts} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename test/browser/sse-api/{sse.node.test.ts => sse.test.ts} (100%) diff --git a/test/browser/sse-api/sse.node.test.ts b/test/browser/sse-api/sse.test.ts similarity index 100% rename from test/browser/sse-api/sse.node.test.ts rename to test/browser/sse-api/sse.test.ts From c53e13e0e4e40fe13a267ff3b771c88e08622d21 Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Tue, 1 Oct 2024 17:42:55 +0200 Subject: [PATCH 05/17] fix: make forwarding manual --- package.json | 3 +- pnpm-lock.yaml | 64 +++++ src/browser/sse.ts | 117 +++------ .../sse-api/sse.server.connect.test.ts | 233 ++++++++++++++++++ 4 files changed, 329 insertions(+), 88 deletions(-) create mode 100644 test/browser/sse-api/sse.server.connect.test.ts diff --git a/package.json b/package.json index 73dc51602..778d8cbbd 100644 --- a/package.json +++ b/package.json @@ -78,7 +78,7 @@ "test:unit": "vitest", "test:node": "vitest run --config=./test/node/vitest.config.mts", "test:native": "vitest --config=./test/native/vitest.config.mts", - "test:browser": "playwright test -c ./test/browser/playwright.config.ts", + "test:browser": "NODE_OPTIONS=--experimental-require-module npx playwright test -c ./test/browser/playwright.config.ts", "test:modules:node": "vitest --config=./test/modules/node/vitest.config.mts", "test:modules:browser": "playwright test -c ./test/modules/browser/playwright.config.ts", "test:ts": "vitest --typecheck --config=./test/typings/vitest.config.mts", @@ -154,6 +154,7 @@ "devDependencies": { "@commitlint/cli": "^18.4.4", "@commitlint/config-conventional": "^18.4.4", + "@epic-web/test-server": "^0.1.0", "@open-draft/test-server": "^0.4.2", "@ossjs/release": "^0.8.1", "@playwright/test": "^1.40.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 611143cee..c4c3597a5 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -64,6 +64,9 @@ devDependencies: '@commitlint/config-conventional': specifier: ^18.4.4 version: 18.6.3 + '@epic-web/test-server': + specifier: ^0.1.0 + version: 0.1.0 '@open-draft/test-server': specifier: ^0.4.2 version: 0.4.2 @@ -677,6 +680,21 @@ packages: '@jridgewell/trace-mapping': 0.3.9 dev: true + /@epic-web/test-server@0.1.0: + resolution: {integrity: sha512-RjJ2WvFepiXynHqm6svriviySVmpCA2QxJmy0DHYwy/9qjeSZvTAIttNqCLLryrV6hL+RsSpHS9CN4MSn/4DpA==} + engines: {node: '>=20'} + dependencies: + '@hono/node-server': 1.13.1(hono@4.6.3) + '@hono/node-ws': 1.0.4(@hono/node-server@1.13.1) + '@open-draft/deferred-promise': 2.2.0 + '@types/ws': 8.5.12 + hono: 4.6.3 + ws: 8.18.0 + transitivePeerDependencies: + - bufferutil + - utf-8-validate + dev: true + /@esbuild/aix-ppc64@0.19.12: resolution: {integrity: sha512-bmoCYyWdEL3wDQIVbcyzRyeKLgk2WtWLTWz1ZIAZF/EGbNOwSA6ew3PftJ1PqMiOOGu0OyFMzG53L0zqIpPeNA==} engines: {node: '>=12'} @@ -1138,6 +1156,28 @@ packages: engines: {node: '>=14'} dev: true + /@hono/node-server@1.13.1(hono@4.6.3): + resolution: {integrity: sha512-TSxE6cT5RHnawbjnveexVN7H2Dpn1YaLxQrCOLCUwD+hFbqbFsnJBgdWcYtASqtWVjA+Qgi8uqFug39GsHjo5A==} + engines: {node: '>=18.14.1'} + peerDependencies: + hono: ^4 + dependencies: + hono: 4.6.3 + dev: true + + /@hono/node-ws@1.0.4(@hono/node-server@1.13.1): + resolution: {integrity: sha512-0j1TMp67U5ym0CIlvPKcKtD0f2ZjaS/EnhOxFLs3bVfV+/4WInBE7hVe2x/7PLEsNIUK9+jVL8lPd28rzTAcZg==} + engines: {node: '>=18.14.1'} + peerDependencies: + '@hono/node-server': ^1.11.1 + dependencies: + '@hono/node-server': 1.13.1(hono@4.6.3) + ws: 8.18.0 + transitivePeerDependencies: + - bufferutil + - utf-8-validate + dev: true + /@humanwhocodes/config-array@0.11.14: resolution: {integrity: sha512-3T8LkOmg45BV5FICb15QQMsyUSWrQ8AygVfC7ZG32zOalnqrilm018ZVCw0eapXux8FtA33q8PSRSstjee3jSg==} engines: {node: '>=10.10.0'} @@ -2134,6 +2174,12 @@ packages: '@types/node': 18.19.28 dev: true + /@types/ws@8.5.12: + resolution: {integrity: sha512-3tPRkv1EtkDpzlgyKyI8pGsGZAGPEaXeu0DOj5DI25Ja91bdAYddYHbADRYVrZMRbfW+1l5YwXVDKohDJNQxkQ==} + dependencies: + '@types/node': 18.19.28 + dev: true + /@types/yargs-parser@21.0.3: resolution: {integrity: sha512-I4q9QU9MQv4oEOz4tAHJtNz1cwuLxn2F3xcc2iV5WdqLPpUnj30aUuxt1mAxYTG+oe8CZMV/+6rU4S4gRDzqtQ==} dev: true @@ -5049,6 +5095,11 @@ packages: parse-passwd: 1.0.0 dev: true + /hono@4.6.3: + resolution: {integrity: sha512-0LeEuBNFeSHGqZ9sNVVgZjB1V5fmhkBSB0hZrpqStSMLOWgfLy0dHOvrjbJh0H2khsjet6rbHfWTHY0kpYThKQ==} + engines: {node: '>=16.9.0'} + dev: true + /hosted-git-info@2.8.9: resolution: {integrity: sha512-mxIDAb9Lsm6DoOJ7xH+5+X4y1LU/4Hi50L9C5sIswK3JzULS4bwk1FvjdBgvYR4bzT4tuUQiC15FE2f5HbLvYw==} dev: true @@ -8672,6 +8723,19 @@ packages: optional: true dev: true + /ws@8.18.0: + resolution: {integrity: sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw==} + engines: {node: '>=10.0.0'} + peerDependencies: + bufferutil: ^4.0.1 + utf-8-validate: '>=5.0.2' + peerDependenciesMeta: + bufferutil: + optional: true + utf-8-validate: + optional: true + dev: true + /xml-name-validator@5.0.0: resolution: {integrity: sha512-EvGK8EJ3DhaHfbRlETOWAS5pO9MZITeauHKJyb8wyajUfQUenkIg2MvLDTZ4T/TgIcm3HU0TFBgWWboAZ30UHg==} engines: {node: '>=18'} diff --git a/src/browser/sse.ts b/src/browser/sse.ts index d76c89018..b87576470 100644 --- a/src/browser/sse.ts +++ b/src/browser/sse.ts @@ -47,7 +47,6 @@ class ServerSentEventHandler extends HttpHandler { }) const server = new ServerSentEventServer({ request: info.request, - client, }) resolver({ @@ -96,7 +95,7 @@ class ServerSentEventClient { } /** - * Dispatches the given event to the underlying `EventSource`. + * Sends the given payload to the underlying `EventSource`. */ public send(payload: { id?: string; event?: string; data: unknown }): void { this[kSend]({ @@ -106,6 +105,29 @@ class ServerSentEventClient { }) } + /** + * Dispatches the given event to the underlying `EventSource`. + */ + public dispatchEvent(event: Event) { + if (event instanceof MessageEvent) { + /** + * @note Use the internal send mechanism to skip normalization + * of the message data (already normalized by the server). + */ + this[kSend]({ + id: event.lastEventId || undefined, + event: event.type === 'message' ? undefined : event.type, + data: event.data, + }) + return + } + + if (event.type === 'error') { + this.error() + return + } + } + /** * Errors the underlying `EventSource`, closing the connection. */ @@ -135,15 +157,11 @@ class ServerSentEventClient { } } -const kListener = Symbol('kListener') - class ServerSentEventServer { protected request: Request - protected client: ServerSentEventClient - constructor(args: { request: Request; client: ServerSentEventClient }) { + constructor(args: { request: Request }) { this.request = args.request - this.client = args.client } /** @@ -152,91 +170,16 @@ class ServerSentEventServer { */ public connect(): EventSource { const requestUrl = new URL(this.request.url) + /** + * @todo @fixme Explore if there's a different way to bypass this request. + * It has to be bypassed not to cause an infinite loop of it being intercepted. + */ requestUrl.searchParams.set('x-msw-intention', 'bypass') - let source = new EventSource(requestUrl, { + const source = new EventSource(requestUrl, { withCredentials: this.request.credentials === 'include', }) - const addEventListener = source.addEventListener.bind(source) - source.addEventListener = (event: any, listener: any, options: any) => { - const wrappedListener = this.wrapEventListener(listener).bind(source) - Object.defineProperty(listener, kListener, { - value: wrappedListener, - }) - addEventListener(event, wrappedListener, options) - } - - const removeEventListener = source.removeEventListener.bind(source) - source.removeEventListener = (event: any, listener: any, options: any) => { - const wrappedListener = listener[kListener] || listener - removeEventListener(event, wrappedListener, options) - } - - source = new Proxy(source, { - set: (target, property, value) => { - switch (property) { - case 'onopen': - case 'onmessage': - case 'onerror': { - return Reflect.set(target, property, this.wrapEventListener(value)) - } - } - - return Reflect.set(target, property, value) - }, - }) - return source } - - private wrapEventListener(listener: (event: Event) => void) { - const { client } = this - - return function (this: EventSource, event: Event) { - const cancelableEvent = - event instanceof MessageEvent - ? new MessageEvent(event.type, { - data: event.data, - cancelable: true, - }) - : new Event(event.type, { - cancelable: true, - }) - Object.defineProperties(cancelableEvent, { - target: { enumerable: true, value: this, writable: false }, - currentTarget: { enumerable: true, value: this, writable: false }, - srcElement: { enumerable: true, value: this, writable: false }, - }) - - listener.call(this, cancelableEvent) - - // The "open" event cannot be prevented so ignore it. - if (event.type === 'open') { - return - } - - if (!cancelableEvent.defaultPrevented) { - switch (event.type) { - case 'error': { - client.error() - break - } - - default: { - if (event instanceof MessageEvent) { - // Use the internal send method to avoid data serialization. - client[kSend]({ - id: event.lastEventId, - event: event.type === 'message' ? undefined : event.type, - data: event.data, - }) - } - - break - } - } - } - } - } } diff --git a/test/browser/sse-api/sse.server.connect.test.ts b/test/browser/sse-api/sse.server.connect.test.ts new file mode 100644 index 000000000..07ea0ad89 --- /dev/null +++ b/test/browser/sse-api/sse.server.connect.test.ts @@ -0,0 +1,233 @@ +import { setupWorker, sse } from 'msw/browser' +import { createTestHttpServer } from '@epic-web/test-server/http' +import { test, expect } from '../playwright.extend' + +declare namespace window { + export const msw: { + setupWorker: typeof setupWorker + sse: typeof sse + } +} + +test('makes the actual request when called "server.connect()"', async ({ + loadExample, + page, +}) => { + await loadExample(require.resolve('./sse.mocks.ts'), { + skipActivation: true, + }) + + await using server = await createTestHttpServer({ + defineRoutes(routes) { + routes.get('/stream', () => { + return new Response(null, { + headers: { + 'content-type': 'text/event-stream', + 'cache-control': 'no-cache', + connection: 'keep-alive', + }, + }) + }) + }, + }) + const url = server.http.url('/stream').href + + await page.evaluate(async (url) => { + const { setupWorker, sse } = window.msw + + const worker = setupWorker( + sse(url, ({ server }) => { + // Calling "server.connect()" establishes the actual connection. + server.connect() + }), + ) + await worker.start() + }, url) + + const openPromise = page.evaluate((url) => { + return new Promise((resolve, reject) => { + const source = new EventSource(url) + source.onopen = () => resolve() + source.onerror = () => reject(new Error('EventSource connection failed')) + }) + }, url) + + await expect(openPromise).resolves.toBeUndefined() +}) + +test('forwards message event from the original server to the client', async ({ + loadExample, + page, +}) => { + await loadExample(require.resolve('./sse.mocks.ts'), { + skipActivation: true, + }) + + await using server = await createTestHttpServer({ + defineRoutes(routes) { + routes.get('/stream', () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode('data: {"message": "hello"}\n\n'), + ) + }, + }) + + return new Response(stream, { + headers: { + 'access-control-allow-origin': '*', + 'content-type': 'text/event-stream', + 'cache-control': 'no-cache', + connection: 'keep-alive', + }, + }) + }) + }, + }) + const url = server.http.url('/stream').href + + await page.evaluate(async (url) => { + const { setupWorker, sse } = window.msw + + const worker = setupWorker( + sse(url, ({ client, server }) => { + const source = server.connect() + source.addEventListener('message', (event) => { + client.dispatchEvent(event) + }) + }), + ) + await worker.start() + }, url) + + const message = await page.evaluate((url) => { + return new Promise((resolve, reject) => { + const source = new EventSource(url) + source.addEventListener('message', (event) => { + resolve(JSON.parse(event.data)) + }) + source.onerror = () => reject(new Error('EventSource connection failed')) + }) + }, url) + + await page.pause() + + expect(message).toEqual({ message: 'hello' }) +}) + +test('forwards custom event from the original server to the client', async ({ + loadExample, + page, +}) => { + await loadExample(require.resolve('./sse.mocks.ts'), { + skipActivation: true, + }) + + await using server = await createTestHttpServer({ + defineRoutes(routes) { + routes.get('/stream', () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode( + 'event: custom\ndata: {"message": "hello"}\n\n', + ), + ) + }, + }) + + return new Response(stream, { + headers: { + 'access-control-allow-origin': '*', + 'content-type': 'text/event-stream', + 'cache-control': 'no-cache', + connection: 'keep-alive', + }, + }) + }) + }, + }) + const url = server.http.url('/stream').href + + await page.evaluate(async (url) => { + const { setupWorker, sse } = window.msw + + const worker = setupWorker( + sse(url, ({ client, server }) => { + const source = server.connect() + source.addEventListener('custom', (event) => { + client.dispatchEvent(event) + }) + }), + ) + await worker.start() + }, url) + + const message = await page.evaluate((url) => { + return new Promise((resolve, reject) => { + const source = new EventSource(url) + source.addEventListener('custom', (event) => { + resolve(JSON.parse(event.data)) + }) + source.onerror = () => reject(new Error('EventSource connection failed')) + }) + }, url) + + await page.pause() + + expect(message).toEqual({ message: 'hello' }) +}) + +test('forwards the original server error to the client', async ({ + loadExample, + page, +}) => { + await loadExample(require.resolve('./sse.mocks.ts'), { + skipActivation: true, + }) + + await using server = await createTestHttpServer({ + defineRoutes(routes) { + routes.get('/stream', () => { + const stream = new ReadableStream({ + start(controller) { + controller.error() + }, + }) + + return new Response(stream, { + headers: { + 'content-type': 'text/event-stream', + 'cache-control': 'no-cache', + connection: 'keep-alive', + }, + }) + }) + }, + }) + const url = server.http.url('/stream').href + + await page.evaluate(async (url) => { + const { setupWorker, sse } = window.msw + + const worker = setupWorker( + sse(url, ({ client, server }) => { + const source = server.connect() + source.addEventListener('error', (event) => { + client.dispatchEvent(event) + }) + }), + ) + await worker.start() + }, url) + + const errorPromise = page.evaluate((url) => { + return new Promise((resolve) => { + const source = new EventSource(url) + source.onerror = () => resolve() + }) + }, url) + + await expect(errorPromise).resolves.toBeUndefined() +}) From 2edf2a6a8da39e9f59f3310449062c605ad550cf Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Tue, 1 Oct 2024 17:55:26 +0200 Subject: [PATCH 06/17] chore: use node v20 for ci --- .github/workflows/ci.yml | 2 +- .github/workflows/release.yml | 2 +- .github/workflows/typescript-nightly.yml | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d0fa59640..238d0356a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,7 +19,7 @@ jobs: - name: Set up Node.js uses: actions/setup-node@v3 with: - node-version: 18 + node-version: 20 - uses: pnpm/action-setup@v4 with: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 7ed8349f9..3fb85b0e1 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -18,7 +18,7 @@ jobs: - name: Setup Node.js uses: actions/setup-node@v3 with: - node-version: 18 + node-version: 20 always-auth: true registry-url: https://registry.npmjs.org diff --git a/.github/workflows/typescript-nightly.yml b/.github/workflows/typescript-nightly.yml index 9449cd3ba..031f0c64d 100644 --- a/.github/workflows/typescript-nightly.yml +++ b/.github/workflows/typescript-nightly.yml @@ -13,7 +13,7 @@ jobs: - name: Set up Node.js uses: actions/setup-node@v3 with: - node-version: 18 + node-version: 20 - name: Get latest TypeScript version id: get-versions @@ -42,7 +42,7 @@ jobs: - name: Set up Node.js uses: actions/setup-node@v3 with: - node-version: 18 + node-version: 20 - name: Set up pnpm uses: pnpm/action-setup@v4 From 81cc17e251deca0471a7920dc5d4967b8c84c323 Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Tue, 1 Oct 2024 18:01:07 +0200 Subject: [PATCH 07/17] test: add "withCredentials" test --- .../sse-api/sse.with-credentials.test.ts | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 test/browser/sse-api/sse.with-credentials.test.ts diff --git a/test/browser/sse-api/sse.with-credentials.test.ts b/test/browser/sse-api/sse.with-credentials.test.ts new file mode 100644 index 000000000..bc6c1c4bc --- /dev/null +++ b/test/browser/sse-api/sse.with-credentials.test.ts @@ -0,0 +1,53 @@ +import { setupWorker, sse } from 'msw/browser' +import { test, expect } from '../playwright.extend' + +declare namespace window { + export const msw: { + setupWorker: typeof setupWorker + sse: typeof sse + } +} + +test('forwards document cookies on the request when "withCredentials" is set to true', async ({ + loadExample, + page, + spyOnConsole, +}) => { + const consoleSpy = spyOnConsole() + await loadExample(require.resolve('./sse.mocks.ts'), { + skipActivation: true, + }) + + await page.evaluate(() => { + document.cookie = 'foo=bar' + }) + + await page.evaluate(async () => { + const { setupWorker, sse } = window.msw + + const worker = setupWorker( + sse('http://localhost/stream', ({ cookies }) => { + console.log(JSON.stringify({ cookies })) + }), + ) + await worker.start() + }) + + await page.evaluate((url) => { + return new Promise((resolve, reject) => { + const source = new EventSource('http://localhost/stream', { + withCredentials: true, + }) + source.onopen = () => resolve() + source.onerror = () => reject(new Error('EventSource connection failed')) + }) + }) + + expect(consoleSpy.get('log')).toContain( + JSON.stringify({ + cookies: { + foo: 'bar', + }, + }), + ) +}) From 122661205a7074c550e2b8c5f9111bd0cafbfbfd Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Tue, 1 Oct 2024 18:02:55 +0200 Subject: [PATCH 08/17] fix: throw if EventSource is undefined --- src/browser/sse.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/browser/sse.ts b/src/browser/sse.ts index b87576470..95400413c 100644 --- a/src/browser/sse.ts +++ b/src/browser/sse.ts @@ -1,3 +1,4 @@ +import { invariant } from 'outvariant' import type { ResponseResolver } from '~/core/handlers/RequestHandler' import { HttpHandler, @@ -39,6 +40,12 @@ export const sse: ServerSentEventRequestHandler = (path, resolver) => { class ServerSentEventHandler extends HttpHandler { constructor(path: Path, resolver: ServerSentEventResolver) { + invariant( + typeof EventSource !== 'undefined', + 'Failed to construct a Server-Sent Event handler for path "%s": your environment does not support the EventSource API', + path, + ) + super('GET', path, (info) => { const stream = new ReadableStream({ start(controller) { From 4c49a14d2d75fffed0f13909fae9b80d95e05a40 Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Tue, 1 Oct 2024 18:30:10 +0200 Subject: [PATCH 09/17] feat: support `EventMap` type argument --- src/browser/sse.ts | 49 +++++++++++++------ .../{sse.test.ts => sse.client.send.test.ts} | 0 test/typings/sse.test-d.ts | 39 +++++++++++++++ test/typings/vitest.config.mts | 4 +- 4 files changed, 75 insertions(+), 17 deletions(-) rename test/browser/sse-api/{sse.test.ts => sse.client.send.test.ts} (100%) create mode 100644 test/typings/sse.test-d.ts diff --git a/src/browser/sse.ts b/src/browser/sse.ts index 95400413c..7a1cbf3e4 100644 --- a/src/browser/sse.ts +++ b/src/browser/sse.ts @@ -7,21 +7,26 @@ import { } from '~/core/handlers/HttpHandler' import type { Path, PathParams } from '~/core/utils/matching/matchRequestUrl' -export type ServerSentEventResolverExtras = - HttpRequestResolverExtras & { - client: ServerSentEventClient - server: ServerSentEventServer - } +export type ServerSentEventResolverExtras< + EventMap extends Record, + Params extends PathParams, +> = HttpRequestResolverExtras & { + client: ServerSentEventClient + server: ServerSentEventServer +} -export type ServerSentEventResolver = - ResponseResolver, any, any> +export type ServerSentEventResolver< + EventMap extends Record, + Params extends PathParams, +> = ResponseResolver, any, any> export type ServerSentEventRequestHandler = < + EventMap extends Record, Params extends PathParams = PathParams, RequestPath extends Path = Path, >( path: RequestPath, - resolver: ServerSentEventResolver, + resolver: ServerSentEventResolver, ) => HttpHandler /** @@ -38,8 +43,10 @@ export const sse: ServerSentEventRequestHandler = (path, resolver) => { return new ServerSentEventHandler(path, resolver) } -class ServerSentEventHandler extends HttpHandler { - constructor(path: Path, resolver: ServerSentEventResolver) { +class ServerSentEventHandler< + EventMap extends Record, +> extends HttpHandler { + constructor(path: Path, resolver: ServerSentEventResolver) { invariant( typeof EventSource !== 'undefined', 'Failed to construct a Server-Sent Event handler for path "%s": your environment does not support the EventSource API', @@ -49,7 +56,7 @@ class ServerSentEventHandler extends HttpHandler { super('GET', path, (info) => { const stream = new ReadableStream({ start(controller) { - const client = new ServerSentEventClient({ + const client = new ServerSentEventClient({ controller, }) const server = new ServerSentEventServer({ @@ -92,7 +99,7 @@ class ServerSentEventHandler extends HttpHandler { const kSend = Symbol('kSend') -class ServerSentEventClient { +class ServerSentEventClient> { private encoder: TextEncoder protected controller: ReadableStreamDefaultController @@ -104,7 +111,19 @@ class ServerSentEventClient { /** * Sends the given payload to the underlying `EventSource`. */ - public send(payload: { id?: string; event?: string; data: unknown }): void { + public send( + payload: + | { + id?: string + event: EventType + data: EventMap[EventType] + } + | { + id?: string + event?: never + data?: EventMap['message'] + }, + ): void { this[kSend]({ id: payload.id, event: payload.event, @@ -142,9 +161,9 @@ class ServerSentEventClient { this.controller.error() } - private [kSend](payload: { + private [kSend](payload: { id?: string - event?: string + event?: EventType data: string }): void { const frames: Array = [] diff --git a/test/browser/sse-api/sse.test.ts b/test/browser/sse-api/sse.client.send.test.ts similarity index 100% rename from test/browser/sse-api/sse.test.ts rename to test/browser/sse-api/sse.client.send.test.ts diff --git a/test/typings/sse.test-d.ts b/test/typings/sse.test-d.ts new file mode 100644 index 000000000..59ce96238 --- /dev/null +++ b/test/typings/sse.test-d.ts @@ -0,0 +1,39 @@ +import { it } from 'vitest' +import { sse } from 'msw/browser' + +it('supports custom event map type argument', () => { + sse<{ myevent: string }>('/stream', ({ client }) => { + client.send({ + event: 'myevent', + data: 'hello', + }) + + client.send({ + // @ts-expect-error Unknown event type "unknown". + event: 'unknown', + data: 'hello', + }) + }) +}) + +it('supports event map type argument for unnamed events', () => { + sse<{ message: number; custom: 'goodbye' }>('/stream', ({ client }) => { + client.send({ + data: 123, + }) + client.send({ + // @ts-expect-error Unexpected data type for "message" event. + data: 'goodbye', + }) + + client.send({ + event: 'custom', + data: 'goodbye', + }) + client.send({ + event: 'custom', + // @ts-expect-error Unexpected data type for "custom" event. + data: 'invalid', + }) + }) +}) diff --git a/test/typings/vitest.config.mts b/test/typings/vitest.config.mts index 4f879ab8e..b5db2bcc8 100644 --- a/test/typings/vitest.config.mts +++ b/test/typings/vitest.config.mts @@ -1,8 +1,8 @@ import * as path from 'node:path' +import * as fs from 'fs' import { defineConfig } from 'vitest/config' -import tsPackageJson from 'typescript/package.json' assert { type: 'json' } import { invariant } from 'outvariant' -import * as fs from 'fs' +import tsPackageJson from 'typescript/package.json' assert { type: 'json' } const LIB_DIR = path.resolve(__dirname, '../../lib') From 4471412bc98b327c17a3063fe626a6428ce3a338 Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Wed, 2 Oct 2024 15:09:48 +0200 Subject: [PATCH 10/17] feat: support server-to-client forwarding --- .nvmrc | 2 +- src/browser/sse.ts | 488 +++++++++++++++++- src/browser/tsconfig.browser.json | 2 +- src/core/utils/handleRequest.ts | 5 +- .../sse-api/sse.server.connect.test.ts | 23 +- 5 files changed, 489 insertions(+), 31 deletions(-) diff --git a/.nvmrc b/.nvmrc index 3462e8c26..92f279e3e 100644 --- a/.nvmrc +++ b/.nvmrc @@ -1 +1 @@ -v18.19.0 \ No newline at end of file +v22 \ No newline at end of file diff --git a/src/browser/sse.ts b/src/browser/sse.ts index 7a1cbf3e4..90a103a59 100644 --- a/src/browser/sse.ts +++ b/src/browser/sse.ts @@ -6,6 +6,7 @@ import { type HttpRequestParsedResult, } from '~/core/handlers/HttpHandler' import type { Path, PathParams } from '~/core/utils/matching/matchRequestUrl' +import { delay } from '~/core/delay' export type ServerSentEventResolverExtras< EventMap extends Record, @@ -61,6 +62,7 @@ class ServerSentEventHandler< }) const server = new ServerSentEventServer({ request: info.request, + client, }) resolver({ @@ -184,10 +186,12 @@ class ServerSentEventClient> { } class ServerSentEventServer { - protected request: Request + private request: Request + private client: ServerSentEventClient - constructor(args: { request: Request }) { + constructor(args: { request: Request; client: ServerSentEventClient }) { this.request = args.request + this.client = args.client } /** @@ -195,17 +199,481 @@ class ServerSentEventServer { * and returns the `EventSource` instance. */ public connect(): EventSource { - const requestUrl = new URL(this.request.url) - /** - * @todo @fixme Explore if there's a different way to bypass this request. - * It has to be bypassed not to cause an infinite loop of it being intercepted. - */ - requestUrl.searchParams.set('x-msw-intention', 'bypass') - - const source = new EventSource(requestUrl, { + const source = new ObservableEventSource(this.request.url, { withCredentials: this.request.credentials === 'include', + headers: { + /** + * @note Mark this request as bypassed so it doesn't trigger + * an infinite loop matching the existing request handler. + */ + 'x-msw-intention': 'bypass', + }, }) + source[kOnAnyMessage] = (event) => { + // Schedule the server-to-client forwarding for the next tick + // so the user can prevent the message event. + queueMicrotask(() => { + if (!event.defaultPrevented) { + this.client.dispatchEvent(event) + } + }) + } + return source } } + +interface ObservableEventSourceInit extends EventSourceInit { + headers?: HeadersInit +} + +type EventHandler = (this: EventSource, event: E) => any + +const kRequest = Symbol('kRequest') +const kReconnectionTime = Symbol('kReconnectionTime') +const kLastEventId = Symbol('kLastEventId') +const kAbortController = Symbol('kAbortController') +const kOnOpen = Symbol('kOnOpen') +const kOnMessage = Symbol('kOnMessage') +const kOnAnyMessage = Symbol('kOnAnyMessage') +const kOnError = Symbol('kOnError') + +class ObservableEventSource extends EventTarget implements EventSource { + static readonly CONNECTING = 0 + static readonly OPEN = 1 + static readonly CLOSED = 2 + + public readonly CONNECTING = ObservableEventSource.CONNECTING + public readonly OPEN = ObservableEventSource.OPEN + public readonly CLOSED = ObservableEventSource.CLOSED + + public readyState: number + public url: string + public withCredentials: boolean + + private [kRequest]: Request + private [kReconnectionTime]: number + private [kLastEventId]: string + private [kAbortController]: AbortController + private [kOnOpen]: EventHandler | null = null + private [kOnMessage]: EventHandler | null = null + private [kOnAnyMessage]: EventHandler | null = null + private [kOnError]: EventHandler | null = null + + constructor(url: string | URL, init?: ObservableEventSourceInit) { + super() + + this.url = new URL(url).href + this.withCredentials = init?.withCredentials ?? false + + this.readyState = this.CONNECTING + + // Support custom request init. + const headers = new Headers(init?.headers || {}) + headers.set('accept', 'text/event-stream') + + this[kAbortController] = new AbortController() + this[kReconnectionTime] = 2000 + this[kLastEventId] = '' + this[kRequest] = new Request(this.url, { + method: 'GET', + headers, + credentials: this.withCredentials ? 'include' : 'omit', + signal: this[kAbortController].signal, + }) + + this.connect() + } + + get onopen(): EventHandler | null { + return this[kOnOpen] + } + + set onopen(handler: EventHandler) { + if (this[kOnOpen]) { + this.removeEventListener('open', this[kOnOpen]) + } + this[kOnOpen] = handler.bind(this) + this.addEventListener('open', this[kOnOpen]) + } + + get onmessage(): EventHandler | null { + return this[kOnMessage] + } + set onmessage(handler: EventHandler) { + if (this[kOnMessage]) { + this.removeEventListener('message', { handleEvent: this[kOnMessage] }) + } + this[kOnMessage] = handler.bind(this) + this.addEventListener('message', { handleEvent: this[kOnMessage] }) + } + + get onerror(): EventHandler | null { + return this[kOnError] + } + set oneerror(handler: EventHandler) { + if (this[kOnError]) { + this.removeEventListener('error', { handleEvent: this[kOnError] }) + } + this[kOnError] = handler.bind(this) + this.addEventListener('error', { handleEvent: this[kOnError] }) + } + + public addEventListener( + type: K, + listener: EventHandler, + options?: boolean | AddEventListenerOptions, + ): void + public addEventListener( + type: string, + listener: EventHandler, + options?: boolean | AddEventListenerOptions, + ): void + public addEventListener( + type: string, + listener: EventListenerOrEventListenerObject, + options?: boolean | AddEventListenerOptions, + ): void + + public addEventListener( + type: string, + listener: EventHandler | EventListenerOrEventListenerObject, + options?: boolean | AddEventListenerOptions, + ): void { + super.addEventListener( + type, + listener as EventListenerOrEventListenerObject, + options, + ) + } + + public removeEventListener( + type: K, + listener: (this: EventSource, ev: EventSourceEventMap[K]) => any, + options?: boolean | EventListenerOptions, + ): void + public removeEventListener( + type: string, + listener: (this: EventSource, event: MessageEvent) => any, + options?: boolean | EventListenerOptions, + ): void + public removeEventListener( + type: string, + listener: EventListenerOrEventListenerObject, + options?: boolean | EventListenerOptions, + ): void + + public removeEventListener( + type: string, + listener: EventHandler | EventListenerOrEventListenerObject, + options?: boolean | AddEventListenerOptions, + ): void { + super.removeEventListener( + type, + listener as EventListenerOrEventListenerObject, + options, + ) + } + + public dispatchEvent(event: Event): boolean { + if (this.readyState === this.CLOSED) { + return false + } + + return super.dispatchEvent(event) + } + + public close(): void { + this[kAbortController].abort() + this.readyState = this.CLOSED + } + + private async connect() { + const response = await fetch(this[kRequest]) + this.processResponse(response) + } + + private processResponse(response: Response): void { + if (!response.body) { + this.failConnection() + return + } + + if (isNetworkError(response)) { + this.reestablishConnection() + return + } + + if ( + response.status !== 200 || + response.headers.get('content-type') !== 'text/event-stream' + ) { + this.failConnection() + return + } + + this.announceConnection() + this.interpretResponseBody(response) + } + + private announceConnection(): void { + queueMicrotask(() => { + if (this.readyState !== this.CLOSED) { + this.readyState = this.OPEN + this.dispatchEvent(new Event('open')) + } + }) + } + + private interpretResponseBody(response: Response): void { + const parsingStream = new EventSourceParsingStream({ + message: (message) => { + if (message.id) { + this[kLastEventId] = message.id + } + + if (message.retry) { + this[kReconnectionTime] = message.retry + } + + const messageEvent = new MessageEvent( + message.event ? message.event : 'message', + { + data: message.data, + origin: this[kRequest].url, + lastEventId: this[kLastEventId], + cancelable: true, + }, + ) + + this[kOnAnyMessage]?.(messageEvent) + this.dispatchEvent(messageEvent) + }, + }) + + response.body!.pipeTo(parsingStream).then(() => { + this.processResponseEndOfBody(response) + }) + } + + private processResponseEndOfBody(response: Response): void { + if (!isNetworkError(response)) { + this.reestablishConnection() + } + } + + private async reestablishConnection(): Promise { + queueMicrotask(() => { + if (this.readyState === this.CLOSED) { + return + } + + this.readyState = this.CONNECTING + this.dispatchEvent(new Event('error')) + }) + + await delay(this[kReconnectionTime]) + + queueMicrotask(async () => { + if (this.readyState !== this.CONNECTING) { + return + } + + if (this[kLastEventId] !== '') { + this[kRequest].headers.set('last-event-id', this[kLastEventId]) + } + + await this.connect() + }) + } + + private failConnection(): void { + queueMicrotask(() => { + if (this.readyState !== this.CLOSED) { + this.readyState = this.CLOSED + this.dispatchEvent(new Event('error')) + } + }) + } +} + +/** + * Checks if the given `Response` instance is a network error. + * @see https://fetch.spec.whatwg.org/#concept-network-error + */ +function isNetworkError(response: Response): boolean { + return ( + response.type === 'error' && + response.status === 0 && + response.statusText === '' && + Array.from(response.headers.entries()).length === 0 && + response.body === null + ) +} + +const enum ControlCharacters { + NewLine = 10, + CarriageReturn = 13, + Space = 32, + Colon = 58, +} + +interface EventSourceMessage { + id?: string + event?: string + data?: string + retry?: number +} + +class EventSourceParsingStream extends WritableStream { + private decoder: TextDecoder + + private buffer?: Uint8Array + private position: number + private fieldLength?: number + private discardTrailingNewline = false + + private message: EventSourceMessage = { + id: undefined, + event: undefined, + data: undefined, + retry: undefined, + } + + constructor( + private underlyingSink: { + message: (message: EventSourceMessage) => void + }, + ) { + super({ + write: (chunk) => { + this.processResponseBodyChunk(chunk) + }, + }) + + this.decoder = new TextDecoder() + this.position = 0 + } + + private resetMessage(): void { + this.message = { + id: undefined, + event: undefined, + data: undefined, + retry: undefined, + } + } + + private processResponseBodyChunk(chunk: Uint8Array): void { + if (this.buffer == null) { + this.buffer = chunk + this.position = 0 + this.fieldLength = -1 + } else { + const nextBuffer = new Uint8Array(this.buffer.length + chunk.length) + nextBuffer.set(this.buffer) + nextBuffer.set(chunk, this.buffer.length) + this.buffer = nextBuffer + } + + const bufferLength = this.buffer.length + let lineStart = 0 + + while (this.position < bufferLength) { + if (this.discardTrailingNewline) { + if (this.buffer[this.position] === ControlCharacters.NewLine) { + lineStart = ++this.position + } + + this.discardTrailingNewline = false + } + + let lineEnd = -1 + + for (; this.position < bufferLength && lineEnd === -1; ++this.position) { + switch (this.buffer[this.position]) { + case ControlCharacters.Colon: { + if (this.fieldLength === -1) { + this.fieldLength = this.position - lineStart + } + break + } + + case ControlCharacters.CarriageReturn: { + this.discardTrailingNewline = true + } + case ControlCharacters.NewLine: { + lineEnd = this.position + break + } + } + } + + if (lineEnd === -1) { + break + } + + this.processLine( + this.buffer.subarray(lineStart, lineEnd), + this.fieldLength!, + ) + + lineStart = this.position + this.fieldLength = -1 + } + + if (lineStart === bufferLength) { + this.buffer = undefined + } else if (lineStart !== 0) { + this.buffer = this.buffer.subarray(lineStart) + this.position -= lineStart + } + } + + private processLine(line: Uint8Array, fieldLength: number): void { + // New line indicates the end of the message. Dispatch it. + if (line.length === 0) { + this.underlyingSink.message(this.message) + this.resetMessage() + return + } + + // Otherwise, keep accumulating message fields until the new line. + if (fieldLength > 0) { + const field = this.decoder.decode(line.subarray(0, fieldLength)) + const valueOffset = + fieldLength + + (line[fieldLength + 1] === ControlCharacters.Space ? 2 : 1) + const value = this.decoder.decode(line.subarray(valueOffset)) + + switch (field) { + case 'data': { + this.message.data = this.message.data + ? this.message.data + '\n' + value + : value + break + } + + case 'event': { + this.message.event = value + break + } + + case 'id': { + this.message.id = value + break + } + + case 'retry': { + const retry = parseInt(value, 10) + + if (!isNaN(retry)) { + this.message.retry = retry + } + break + } + } + } + } +} diff --git a/src/browser/tsconfig.browser.json b/src/browser/tsconfig.browser.json index e998a8e8e..6a44514da 100644 --- a/src/browser/tsconfig.browser.json +++ b/src/browser/tsconfig.browser.json @@ -3,7 +3,7 @@ "compilerOptions": { // Expose browser-specific libraries only for the // source code under the "src/browser" directory. - "lib": ["DOM", "WebWorker"] + "lib": ["DOM", "WebWorker", "DOM.Iterable"] }, "include": ["../../global.d.ts", "./global.browser.d.ts", "./**/*.ts"] } diff --git a/src/core/utils/handleRequest.ts b/src/core/utils/handleRequest.ts index ab2fcb88b..766b22ce6 100644 --- a/src/core/utils/handleRequest.ts +++ b/src/core/utils/handleRequest.ts @@ -53,10 +53,7 @@ export async function handleRequest( emitter.emit('request:start', { request, requestId }) // Perform bypassed requests (i.e. wrapped in "bypass()") as-is. - if ( - request.headers.get('x-msw-intention') === 'bypass' || - new URL(request.url).searchParams.get('x-msw-intention') === 'bypass' - ) { + if (request.headers.get('x-msw-intention') === 'bypass') { emitter.emit('request:end', { request, requestId }) handleRequestOptions?.onPassthroughResponse?.(request) return diff --git a/test/browser/sse-api/sse.server.connect.test.ts b/test/browser/sse-api/sse.server.connect.test.ts index 07ea0ad89..f031bab3b 100644 --- a/test/browser/sse-api/sse.server.connect.test.ts +++ b/test/browser/sse-api/sse.server.connect.test.ts @@ -55,7 +55,7 @@ test('makes the actual request when called "server.connect()"', async ({ await expect(openPromise).resolves.toBeUndefined() }) -test('forwards message event from the original server to the client', async ({ +test('forwards message event from the server to the client automatically', async ({ loadExample, page, }) => { @@ -92,10 +92,7 @@ test('forwards message event from the original server to the client', async ({ const worker = setupWorker( sse(url, ({ client, server }) => { - const source = server.connect() - source.addEventListener('message', (event) => { - client.dispatchEvent(event) - }) + server.connect() }), ) await worker.start() @@ -105,6 +102,8 @@ test('forwards message event from the original server to the client', async ({ return new Promise((resolve, reject) => { const source = new EventSource(url) source.addEventListener('message', (event) => { + console.log('client got:', event.type, event.data) + resolve(JSON.parse(event.data)) }) source.onerror = () => reject(new Error('EventSource connection failed')) @@ -116,7 +115,7 @@ test('forwards message event from the original server to the client', async ({ expect(message).toEqual({ message: 'hello' }) }) -test('forwards custom event from the original server to the client', async ({ +test('forwards custom event from the server to the client automatically', async ({ loadExample, page, }) => { @@ -155,10 +154,7 @@ test('forwards custom event from the original server to the client', async ({ const worker = setupWorker( sse(url, ({ client, server }) => { - const source = server.connect() - source.addEventListener('custom', (event) => { - client.dispatchEvent(event) - }) + server.connect() }), ) await worker.start() @@ -179,7 +175,7 @@ test('forwards custom event from the original server to the client', async ({ expect(message).toEqual({ message: 'hello' }) }) -test('forwards the original server error to the client', async ({ +test('forwards error event from the server to the client automatically', async ({ loadExample, page, }) => { @@ -213,10 +209,7 @@ test('forwards the original server error to the client', async ({ const worker = setupWorker( sse(url, ({ client, server }) => { - const source = server.connect() - source.addEventListener('error', (event) => { - client.dispatchEvent(event) - }) + server.connect() }), ) await worker.start() From 3fdf314ba3360c7857f176c7fec97fc2e38be31b Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Fri, 4 Oct 2024 15:19:30 +0200 Subject: [PATCH 11/17] test(axios-upload): wrap request promise in expect --- test/node/third-party/axios-upload.node.test.ts | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/test/node/third-party/axios-upload.node.test.ts b/test/node/third-party/axios-upload.node.test.ts index e63e352bb..f7f2c068a 100644 --- a/test/node/third-party/axios-upload.node.test.ts +++ b/test/node/third-party/axios-upload.node.test.ts @@ -45,12 +45,13 @@ it('responds with a mocked response to an upload request', async () => { const file = new Blob(['Hello', 'world'], { type: 'text/plain' }) formData.set('file', file, 'doc.txt') - const response = await request.post('/upload', formData) - - expect(response.data).toEqual({ - message: 'Successfully uploaded "doc.txt"!', - content: 'Helloworld', + await expect(request.post('/upload', formData)).resolves.toMatchObject({ + data: { + message: 'Successfully uploaded "doc.txt"!', + content: 'Helloworld', + }, }) + expect(onUploadProgress.mock.calls.length).toBeGreaterThan(0) expect(onUploadProgress).toHaveBeenNthCalledWith( 1, From b11ba9b06b33bd20828f2213b6f58262a88d8936 Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Fri, 4 Oct 2024 15:33:05 +0200 Subject: [PATCH 12/17] fix: skip stringifying non-objects --- src/browser/sse.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/browser/sse.ts b/src/browser/sse.ts index 90a103a59..4f7987bb1 100644 --- a/src/browser/sse.ts +++ b/src/browser/sse.ts @@ -103,7 +103,7 @@ const kSend = Symbol('kSend') class ServerSentEventClient> { private encoder: TextEncoder - protected controller: ReadableStreamDefaultController + private controller: ReadableStreamDefaultController constructor(args: { controller: ReadableStreamDefaultController }) { this.encoder = new TextEncoder() @@ -129,7 +129,10 @@ class ServerSentEventClient> { this[kSend]({ id: payload.id, event: payload.event, - data: JSON.stringify(payload.data), + data: + typeof payload.data === 'object' + ? JSON.stringify(payload.data) + : payload.data, }) } @@ -166,7 +169,7 @@ class ServerSentEventClient> { private [kSend](payload: { id?: string event?: EventType - data: string + data: string | EventMap[EventType] | EventMap['message'] | undefined }): void { const frames: Array = [] From 852870a4a0e7e469a02607f1691b579caffda57c Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Fri, 4 Oct 2024 15:33:16 +0200 Subject: [PATCH 13/17] test: add runtime request handler test --- .../sse-api/sse.server.connect.test.ts | 4 +- test/browser/sse-api/sse.use.test.ts | 58 +++++++++++++++++++ 2 files changed, 59 insertions(+), 3 deletions(-) create mode 100644 test/browser/sse-api/sse.use.test.ts diff --git a/test/browser/sse-api/sse.server.connect.test.ts b/test/browser/sse-api/sse.server.connect.test.ts index f031bab3b..b7a4dbc5e 100644 --- a/test/browser/sse-api/sse.server.connect.test.ts +++ b/test/browser/sse-api/sse.server.connect.test.ts @@ -91,7 +91,7 @@ test('forwards message event from the server to the client automatically', async const { setupWorker, sse } = window.msw const worker = setupWorker( - sse(url, ({ client, server }) => { + sse(url, ({ server }) => { server.connect() }), ) @@ -102,8 +102,6 @@ test('forwards message event from the server to the client automatically', async return new Promise((resolve, reject) => { const source = new EventSource(url) source.addEventListener('message', (event) => { - console.log('client got:', event.type, event.data) - resolve(JSON.parse(event.data)) }) source.onerror = () => reject(new Error('EventSource connection failed')) diff --git a/test/browser/sse-api/sse.use.test.ts b/test/browser/sse-api/sse.use.test.ts new file mode 100644 index 000000000..325acee37 --- /dev/null +++ b/test/browser/sse-api/sse.use.test.ts @@ -0,0 +1,58 @@ +import { setupWorker, sse } from 'msw/browser' +import { test, expect } from '../playwright.extend' + +declare namespace window { + export const msw: { + setupWorker: typeof setupWorker + sse: typeof sse + } +} + +test('supports server-sent event handler overrides', async ({ + loadExample, + page, +}) => { + await loadExample(require.resolve('./sse.mocks.ts'), { + skipActivation: true, + }) + + const workerRef = await page.evaluateHandle(async () => { + const { setupWorker, sse } = window.msw + + const worker = setupWorker( + sse('http://localhost/stream', ({ client }) => { + client.send({ data: 'happy-path' }) + }), + ) + await worker.start() + return worker + }) + + await page.evaluate((worker) => { + const { sse } = window.msw + + worker.use( + // Adding this runtime request handler will make it + // take precedence over the happy path handler above. + sse('http://localhost/stream', ({ client }) => { + // Queue the data for the next tick to rule out + // the happy path handler from executing. + queueMicrotask(() => { + client.send({ data: 'override' }) + }) + }), + ) + }, workerRef) + + const message = await page.evaluate(() => { + return new Promise((resolve, reject) => { + const source = new EventSource('http://localhost/stream') + source.addEventListener('message', (event) => { + resolve(event.data) + }) + source.onerror = reject + }) + }) + + expect(message).toBe('override') +}) From 4d6ccfe93e2ece0be016d3a351de252678a17f2d Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Fri, 4 Oct 2024 15:52:47 +0200 Subject: [PATCH 14/17] chore: use v22 for ci only --- .github/workflows/ci.yml | 2 +- .github/workflows/release.yml | 2 +- .github/workflows/typescript-nightly.yml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 238d0356a..1730987dd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,7 +19,7 @@ jobs: - name: Set up Node.js uses: actions/setup-node@v3 with: - node-version: 20 + node-version: 22 - uses: pnpm/action-setup@v4 with: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 3fb85b0e1..7ed8349f9 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -18,7 +18,7 @@ jobs: - name: Setup Node.js uses: actions/setup-node@v3 with: - node-version: 20 + node-version: 18 always-auth: true registry-url: https://registry.npmjs.org diff --git a/.github/workflows/typescript-nightly.yml b/.github/workflows/typescript-nightly.yml index 031f0c64d..70b81abb6 100644 --- a/.github/workflows/typescript-nightly.yml +++ b/.github/workflows/typescript-nightly.yml @@ -13,7 +13,7 @@ jobs: - name: Set up Node.js uses: actions/setup-node@v3 with: - node-version: 20 + node-version: 18 - name: Get latest TypeScript version id: get-versions From 8afa887b63086edaea0e15e3862c657d43022cc5 Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Fri, 4 Oct 2024 15:53:07 +0200 Subject: [PATCH 15/17] test: improve axios errors in test --- test/node/third-party/axios-upload.node.test.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/test/node/third-party/axios-upload.node.test.ts b/test/node/third-party/axios-upload.node.test.ts index f7f2c068a..ca5089a4e 100644 --- a/test/node/third-party/axios-upload.node.test.ts +++ b/test/node/third-party/axios-upload.node.test.ts @@ -45,11 +45,13 @@ it('responds with a mocked response to an upload request', async () => { const file = new Blob(['Hello', 'world'], { type: 'text/plain' }) formData.set('file', file, 'doc.txt') - await expect(request.post('/upload', formData)).resolves.toMatchObject({ - data: { - message: 'Successfully uploaded "doc.txt"!', - content: 'Helloworld', - }, + const response = await request.post('/upload', formData).catch((error) => { + throw error.response.data + }) + + expect(response.data).toEqual({ + message: 'Successfully uploaded "doc.txt"!', + content: 'Helloworld', }) expect(onUploadProgress.mock.calls.length).toBeGreaterThan(0) From bbc45532afb2c149c3072de3f4f5216681183b37 Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Thu, 10 Oct 2024 18:56:47 +0200 Subject: [PATCH 16/17] fix: fail connection on fetch failure --- src/browser/sse.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/browser/sse.ts b/src/browser/sse.ts index 4f7987bb1..f2472e7c3 100644 --- a/src/browser/sse.ts +++ b/src/browser/sse.ts @@ -393,8 +393,15 @@ class ObservableEventSource extends EventTarget implements EventSource { } private async connect() { - const response = await fetch(this[kRequest]) - this.processResponse(response) + await fetch(this[kRequest]) + .then((response) => { + this.processResponse(response) + }) + .catch(() => { + // Fail the connection on request errors instead of + // throwing a generic "Failed to fetch" error. + this.failConnection() + }) } private processResponse(response: Response): void { From 2afd608a9dc044f20f18bcbc97a6b1e0b4e2e98d Mon Sep 17 00:00:00 2001 From: Artem Zakharchenko Date: Thu, 10 Oct 2024 18:57:14 +0200 Subject: [PATCH 17/17] fix: prevent dispatching message if data is empty --- src/browser/sse.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/browser/sse.ts b/src/browser/sse.ts index f2472e7c3..1269ebf32 100644 --- a/src/browser/sse.ts +++ b/src/browser/sse.ts @@ -644,6 +644,13 @@ class EventSourceParsingStream extends WritableStream { private processLine(line: Uint8Array, fieldLength: number): void { // New line indicates the end of the message. Dispatch it. if (line.length === 0) { + // Prevent dispatching the message if the data is an empty string. + // That is a no-op per spec. + if (this.message.data === undefined) { + this.message.event = undefined + return + } + this.underlyingSink.message(this.message) this.resetMessage() return