diff --git a/.env.test b/.env.test index 56eee10..b23a277 100644 --- a/.env.test +++ b/.env.test @@ -15,4 +15,7 @@ TLDRAW_WEBSOCKET_PORT=3399 X_API_ALLOWED_KEYS=randomString -NOT_AUTHENTICATED_REDIRECT_URL=http://localhost:4000/login \ No newline at end of file +WORKER_TASK_DEBOUNCE=500 +WORKER_MIN_MESSAGE_LIFETIME=1000 + +NOT_AUTHENTICATED_REDIRECT_URL=http://localhost:4000/login diff --git a/src/modules/server/api/test/websocket.api.spec.ts b/src/modules/server/api/test/websocket.api.spec.ts index 1009496..7c639c0 100644 --- a/src/modules/server/api/test/websocket.api.spec.ts +++ b/src/modules/server/api/test/websocket.api.spec.ts @@ -57,14 +57,6 @@ describe('Websocket Api Test', () => { return isSynced; }); - /* const waitUntilDocValueMatches = (ydoc: Doc, key: string, value: number): Promise => - promise.until(0, () => { - const result = ydoc.getMap().get(key); - const isMatch = result === value; - - return isMatch; - }); */ - describe('when clients have permission for room', () => { describe('when two clients connect to the same doc before any changes', () => { const setup = () => { @@ -83,14 +75,14 @@ describe('Websocket Api Test', () => { error: null, }); - const { ydoc: client1Doc } = createWsClient(room); - const { ydoc: client2Doc } = createWsClient(room); + const { ydoc: client1Doc, provider: provider1 } = createWsClient(room); + const { ydoc: client2Doc, provider: provider2 } = createWsClient(room); - return { client1Doc, client2Doc }; + return { client1Doc, client2Doc, provider1, provider2 }; }; it('syncs doc changes of first client to second client', async () => { - const { client1Doc, client2Doc } = setup(); + const { client1Doc, client2Doc, provider1, provider2 } = setup(); client1Doc.getMap().set('a', 1); @@ -98,10 +90,15 @@ describe('Websocket Api Test', () => { const result = client2Doc.getMap().get('a'); expect(result).toBe(1); + + provider1.awareness.destroy(); + provider2.awareness.destroy(); + provider1.destroy(); + provider2.destroy(); }); it('syncs subsequent doc changes of second client to first client', async () => { - const { client1Doc, client2Doc } = setup(); + const { client1Doc, client2Doc, provider1, provider2 } = setup(); client1Doc.getMap().set('a', 1); await waitUntilDocsEqual(client1Doc, client2Doc); @@ -111,20 +108,12 @@ describe('Websocket Api Test', () => { const result = client1Doc.getMap().get('a'); expect(result).toBe(2); - }); - - /* it('syncs nearly parallel doc changes of second client to first client', async () => { - // This test is instable - const { client1Doc, client2Doc } = setup(); - - client1Doc.getMap().set('a', 1); - client2Doc.getMap().set('a', 2); - - await waitUntilDocValueMatches(client1Doc, 'a', 2); - const result = client1Doc.getMap().get('a'); - expect(result).toBe(2); - }); */ + provider1.awareness.destroy(); + provider2.awareness.destroy(); + provider1.destroy(); + provider2.destroy(); + }); }); describe('when two clients connect to the same doc one before and one after the changes', () => { @@ -132,36 +121,47 @@ describe('Websocket Api Test', () => { const randomString = Math.random().toString(36).substring(7); const room = randomString; - authorizationService.hasPermission.mockResolvedValue({ + authorizationService.hasPermission.mockResolvedValueOnce({ + hasWriteAccess: true, + room: randomString, + userid: 'userId1', + error: null, + }); + authorizationService.hasPermission.mockResolvedValueOnce({ hasWriteAccess: true, room: randomString, - userid: 'userId', + userid: 'userId2', error: null, }); - const { ydoc: client1Doc } = createWsClient(room); + const { ydoc: client1Doc, provider } = createWsClient(room); - return { client1Doc, room }; + return { client1Doc, room, provider }; }; it('syncs doc changes of first client to second client', async () => { - const { client1Doc, room } = setup(); + const { client1Doc, room, provider } = setup(); client1Doc.getMap().set('a', 1); - const { ydoc: client2Doc } = createWsClient(room); + const { ydoc: client2Doc, provider: provider2 } = createWsClient(room); await waitUntilDocsEqual(client1Doc, client2Doc); const result = client2Doc.getMap().get('a'); expect(result).toBe(1); + + provider.awareness.destroy(); + provider.destroy(); + provider2.awareness.destroy(); + provider2.destroy(); }); it('syncs subsequent doc changes of second client to first client', async () => { - const { client1Doc, room } = setup(); + const { client1Doc, room, provider } = setup(); client1Doc.getMap().set('a', 1); - const { ydoc: client2Doc } = createWsClient(room); + const { ydoc: client2Doc, provider: provider2 } = createWsClient(room); await waitUntilDocsEqual(client1Doc, client2Doc); client2Doc.getMap().set('a', 2); @@ -169,27 +169,13 @@ describe('Websocket Api Test', () => { const result = client1Doc.getMap().get('a'); expect(result).toBe(2); - }); - - /* it('syncs nearly parallel doc changes of second client to first client', async () => { - // This test is instable - const { client1Doc, room } = setup(); - - client1Doc.getMap().set('a', 1); - - const { ydoc: client2Doc } = createWsClient(room); - client2Doc.getMap().set('a', 2); - - await waitUntilDocValueMatches(client1Doc, 'a', 2); - const result = client1Doc.getMap().get('a'); - expect(result).toBe(2); - }); */ + provider.awareness.destroy(); + provider.destroy(); + provider2.awareness.destroy(); + provider2.destroy(); + }); }); - - /* describe('when doc is only pesisted in storage and not in redis', () => { - // Need to implement this test - }); */ }); describe('when client has no permission for room', () => { @@ -199,14 +185,14 @@ describe('Websocket Api Test', () => { const room = randomString; const errorResponse = ResponsePayloadBuilder.buildWithError(4401, 'Unauthorized'); - authorizationService.hasPermission.mockResolvedValue(errorResponse); + authorizationService.hasPermission.mockResolvedValueOnce(errorResponse); const { ydoc: client1Doc, provider } = createWsClient(room); return { client1Doc, provider }; }; - it('syncs doc changes of first client to second client', async () => { + it('returns unauthorized error', async () => { const { provider } = setup(); let error: CloseEvent; @@ -224,6 +210,9 @@ describe('Websocket Api Test', () => { expect(error.reason).toBe('Unauthorized'); // @ts-ignore expect(error.code).toBe(4401); + + provider.awareness.destroy(); + provider.destroy(); }); }); @@ -233,14 +222,14 @@ describe('Websocket Api Test', () => { const room = randomString; const response = ResponsePayloadBuilder.build(null, 'userId'); - authorizationService.hasPermission.mockResolvedValue(response); + authorizationService.hasPermission.mockResolvedValueOnce(response); const { ydoc: client1Doc, provider } = createWsClient(room); return { client1Doc, provider }; }; - it('syncs doc changes of first client to second client', async () => { + it('returns error', async () => { const { provider } = setup(); let error: CloseEvent; @@ -258,24 +247,24 @@ describe('Websocket Api Test', () => { expect(error.reason).toBe('Missing room or userid'); // @ts-ignore expect(error.code).toBe(1008); + + provider.awareness.destroy(); + provider.destroy(); }); }); }); - /*describe('when openCallback catch an error', () => { + describe('when openCallback catch an error', () => { const setup = () => { const randomString = Math.random().toString(36).substring(7); const room = randomString; - const response = ResponsePayloadBuilder.build(room, 'userId'); - authorizationService.hasPermission.mockResolvedValue(response); - const { ydoc: client1Doc, provider } = createWsClient(room); return { client1Doc, provider }; }; - it('syncs doc changes of first client to second client', async () => { + it('returns internal server error', async () => { const { provider } = setup(); let error: CloseEvent; @@ -284,16 +273,15 @@ describe('Websocket Api Test', () => { error = event as CloseEvent; }; } - //spyOn(provider.ws, 'end').and.callThrough(); await promise.until(0, () => { return error as unknown as boolean; }); // @ts-ignore - //expect(error.reason).toBe('Internal Server Error'); + expect(error.reason).toBe('Internal Server Error'); // @ts-ignore expect(error.code).toBe(1011); }); - });*/ + }); }); diff --git a/src/modules/worker/worker.api.spec.ts b/src/modules/worker/worker.api.spec.ts new file mode 100644 index 0000000..1382540 --- /dev/null +++ b/src/modules/worker/worker.api.spec.ts @@ -0,0 +1,228 @@ +import { createMock, DeepMocked } from '@golevelup/ts-jest'; +import { INestApplication } from '@nestjs/common'; +import { Test } from '@nestjs/testing'; +import { WebSocket } from 'ws'; +import { WebsocketProvider } from 'y-websocket'; +import * as Y from 'yjs'; +import { Doc } from 'yjs'; +import { AuthorizationService } from '../../infra/authorization/authorization.service.js'; +import { IoRedisAdapter } from '../../infra/redis/ioredis.adapter.js'; +import { StorageService } from '../../infra/storage/storage.service.js'; +import { computeRedisRoomStreamName } from '../../infra/y-redis/helper.js'; +import { REDIS_FOR_DELETION } from '../../modules/server/server.const.js'; +import { ServerModule } from '../../modules/server/server.module.js'; +import { WorkerModule } from './worker.module.js'; +import { WorkerService } from './worker.service.js'; + +describe('Worker Api Test', () => { + let app: INestApplication; + let authorizationService: DeepMocked; + let workerService: WorkerService; + let storageService: StorageService; + let ioRedisAdapter: IoRedisAdapter; + + // This port must be different from the one used in the websocket api test + // because tests are executed parallel and therefore we can have port conflicts. + const port = 3398; + const url = `ws://localhost:${port}`; + + process.env.TLDRAW_WEBSOCKET_URL = url; + process.env.TLDRAW_WEBSOCKET_PORT = port.toString(); + + beforeAll(async () => { + const moduleFixture = await Test.createTestingModule({ + imports: [ServerModule, WorkerModule], + }) + .overrideProvider(AuthorizationService) + .useValue(createMock()) + .compile(); + + app = moduleFixture.createNestApplication(); + await app.init(); + authorizationService = await app.resolve(AuthorizationService); + workerService = await app.resolve(WorkerService); + storageService = await app.resolve(StorageService); + ioRedisAdapter = await app.resolve(REDIS_FOR_DELETION); + }); + + afterAll(async () => { + await app.close(); + }); + + const createWsClient = (room: string) => { + const ydoc = new Doc(); + const serverUrl = url; + const prefix = 'y'; + const provider = new WebsocketProvider(serverUrl, prefix + '-' + room, ydoc, { + // @ts-ignore + WebSocketPolyfill: WebSocket, + connect: true, + disableBc: true, + }); + + return { ydoc, provider }; + }; + + describe('when one client sends update', () => { + const setup = () => { + workerService.start(); + + const room = Math.random().toString(36).substring(7); + + authorizationService.hasPermission.mockResolvedValueOnce({ + hasWriteAccess: true, + room, + userid: 'userId1', + error: null, + }); + + const { ydoc: client1Doc, provider } = createWsClient(room); + + const property = 'property'; + const value = 'value'; + + client1Doc.getMap().set(property, value); + + return { client1Doc, room, property, value, provider }; + }; + + it('saves doc to storage', async () => { + const { room, property, value, provider } = setup(); + + let doc; + while (!doc) { + doc = await storageService.retrieveDoc(room, 'index'); + } + + let decodedDoc; + if (doc?.doc) { + decodedDoc = Y.decodeUpdateV2(doc.doc); + } + + // @ts-ignore + const resultProperty1 = decodedDoc.structs[0].parentSub; + // @ts-ignore + const resultUpdateValue = decodedDoc.structs[0].content.arr[0]; + + expect(resultProperty1).toBe(property); + expect(resultUpdateValue).toBe(value); + + workerService.stop(); + provider.awareness.destroy(); + provider.disconnect(); + provider.destroy(); + }); + }); + + describe('when second client sends update', () => { + const setup = () => { + workerService.start(); + + const room = Math.random().toString(36).substring(7); + + authorizationService.hasPermission.mockResolvedValueOnce({ + hasWriteAccess: true, + room, + userid: 'userId1', + error: null, + }); + + authorizationService.hasPermission.mockResolvedValueOnce({ + hasWriteAccess: true, + room, + userid: 'userId2', + error: null, + }); + + const { ydoc: client1Doc, provider: provider1 } = createWsClient(room); + const { ydoc: client2Doc, provider: provider2 } = createWsClient(room); + + const property1 = 'property1'; + const property2 = 'property2'; + const value1 = 'value1'; + const value2 = 'value2'; + + client1Doc.getMap().set(property1, value1); + client2Doc.getMap().set(property2, value2); + + return { client1Doc, room, property2, value2, provider1, provider2 }; + }; + + it('saves doc to storage', async () => { + const { room, property2, value2, provider1, provider2 } = setup(); + + let resultProperty; + let resultValue; + while (!resultProperty) { + const doc = await storageService.retrieveDoc(room, 'index'); + if (doc?.doc) { + const decodedDoc = Y.decodeUpdateV2(doc.doc); + // @ts-ignore + resultProperty = decodedDoc.structs.find((item) => item.parentSub === property2).parentSub; + // @ts-ignore + resultValue = decodedDoc.structs.find((item) => item.content.arr[0] === value2).content.arr[0]; + } + } + + expect(resultProperty).toBe(property2); + expect(resultValue).toBe(value2); + + workerService.stop(); + provider1.awareness.destroy(); + provider2.awareness.destroy(); + provider1.destroy(); + provider2.destroy(); + }); + }); + + describe('when deleted doc entry exists', () => { + const setup = async () => { + workerService.start(); + + const room = Math.random().toString(36).substring(7); + + authorizationService.hasPermission.mockResolvedValueOnce({ + hasWriteAccess: true, + room, + userid: 'userId1', + error: null, + }); + + const { ydoc: client1Doc, provider } = createWsClient(room); + + const property = 'property'; + const value = 'value'; + + client1Doc.getMap().set(property, value); + + let doc; + while (!doc) { + doc = await storageService.retrieveDoc(room, 'index'); + } + + provider.disconnect(); + provider.awareness.destroy(); + provider.destroy(); + + const streamName = computeRedisRoomStreamName(room, 'index', 'y'); + await ioRedisAdapter.markToDeleteByDocName(streamName); + + return { room }; + }; + + it('deletes doc in storage', async () => { + const { room } = await setup(); + + let doc = undefined; + while (doc !== null) { + try { + doc = await storageService.retrieveDoc(room, 'index'); + } catch {} + } + + expect(doc).toBeNull(); + + workerService.stop(); + }, 10000); + }); +});