diff --git a/package.json b/package.json index 09423d0..95bf044 100644 --- a/package.json +++ b/package.json @@ -47,7 +47,7 @@ "socket.io": "^4.7.5", "socket.io-adapter": "^2.5.4", "socket.io-redis": "^6.1.1", - "waterbus-proto": "^1.1.5", + "waterbus-proto": "^1.1.11", "werift": "^0.19.1", "winston": "^3.13.0" }, diff --git a/src/domain/constants/socket_events.ts b/src/domain/constants/socket_events.ts index 8a53312..a54443b 100644 --- a/src/domain/constants/socket_events.ts +++ b/src/domain/constants/socket_events.ts @@ -32,10 +32,20 @@ const SocketEvent = { subscriberRenegotiationSSC: 'SUBSCRIBER_RENEGOTIATION_SSC', + // White board + startWhiteBoardSSC: 'START_WHITE_BOARD_SSC', + startWhiteBoardCSS: 'START_WHITE_BOARD_CSS', + updateWhiteBoardCSS: 'UPDATE_WHITE_BOARD_CSS', + updateWhiteBoardSSC: 'UPDATE_WHITE_BOARD_SSC', + cleanWhiteBoardCSS: 'CLEAN_WHITE_BOARD_CSS', + cleanWhiteBoardSSC: 'CLEAN_WHITE_BOARD_SSC', + // Chats sendMessageSSC: 'SEND_MESSAGE_SSC', updateMessageSSC: 'UPDATE_MESSAGE_SSC', deleteMessageSSC: 'DELETE_MESSAGE_SSC', + newMemberJoinedSSC: 'NEW_MEMBER_JOINED_SSC', + newInvitationSSC: 'NEW_INVITATION_SSC', // System connection: 'connection', diff --git a/src/domain/models/white-board-action.ts b/src/domain/models/white-board-action.ts new file mode 100644 index 0000000..318957f --- /dev/null +++ b/src/domain/models/white-board-action.ts @@ -0,0 +1,4 @@ +export enum WhiteBoardAction { + add = 'add', + remove = 'remove', +} diff --git a/src/infrastructure/client-proxy/client-proxy.module.ts b/src/infrastructure/client-proxy/client-proxy.module.ts index 1f0eb04..64dec97 100644 --- a/src/infrastructure/client-proxy/client-proxy.module.ts +++ b/src/infrastructure/client-proxy/client-proxy.module.ts @@ -20,6 +20,12 @@ export const getGrpcClientOptions = ( case EPackage.MEETING: url = config.getMeetingGrpcUrl(); break; + case EPackage.WHITEBOARD: + url = config.getWhiteBoardGrpcUrl(); + break; + case EPackage.RECORD: + url = config.getRecordGrpcUrl(); + break; } return { transport: Transport.GRPC, @@ -40,6 +46,8 @@ export const getGrpcClientOptions = ( export class ClientProxyModule { static authClientProxy = 'authClientProxy'; static meetingClientProxy = 'meetingClientProxy'; + static whiteBoardClientProxy = 'whiteBoardClientProxy'; + static recordClientProxy = 'recordClientProxy'; static register(): DynamicModule { return { @@ -61,10 +69,28 @@ export class ClientProxyModule { getGrpcClientOptions(config, EPackage.MEETING), ), }, + { + provide: ClientProxyModule.whiteBoardClientProxy, + inject: [EnvironmentConfigService], + useFactory: (config: EnvironmentConfigService) => + ClientProxyFactory.create( + getGrpcClientOptions(config, EPackage.WHITEBOARD), + ), + }, + { + provide: ClientProxyModule.recordClientProxy, + inject: [EnvironmentConfigService], + useFactory: (config: EnvironmentConfigService) => + ClientProxyFactory.create( + getGrpcClientOptions(config, EPackage.RECORD), + ), + }, ], exports: [ ClientProxyModule.authClientProxy, ClientProxyModule.meetingClientProxy, + ClientProxyModule.whiteBoardClientProxy, + ClientProxyModule.recordClientProxy, ], }; } diff --git a/src/infrastructure/config/environment/environments.ts b/src/infrastructure/config/environment/environments.ts index e30ebf1..2141396 100644 --- a/src/infrastructure/config/environment/environments.ts +++ b/src/infrastructure/config/environment/environments.ts @@ -26,6 +26,14 @@ export class EnvironmentConfigService { return this.configService.get('MEETING_GRPC_ADDRESS'); } + getRecordGrpcUrl(): string { + return this.configService.get('RECORD_GRPC_ADDRESS'); + } + + getWhiteBoardGrpcUrl(): string { + return this.configService.get('WHITE_BOARD_GRPC_ADDRESS'); + } + getTurnUsername(): string { return this.configService.get('TURN_USERNAME'); } diff --git a/src/infrastructure/controllers/chats/chats.proto.controller.ts b/src/infrastructure/controllers/chats/chats.proto.controller.ts index 8c72ed4..9c8971d 100644 --- a/src/infrastructure/controllers/chats/chats.proto.controller.ts +++ b/src/infrastructure/controllers/chats/chats.proto.controller.ts @@ -103,4 +103,77 @@ export class ChatGrpcController implements chat.ChatService { }); } } + + @GrpcMethod('ChatService', 'newMemberJoined') + newMemberJoined( + data: chat.NewMemberJoinedRequest, + ): Observable { + try { + const payload = { + member: data.member, + meetingId: data.meetingId, + }; + + this.socketGateway.emitTo({ + data: payload, + room: null, + event: SocketEvent.newMemberJoinedSSC, + socketIds: data.ccus, + }); + + const response: chat.MessageResponse = { + succeed: true, + }; + + return new Observable((observer) => { + observer.next(response); + observer.complete(); + }); + } catch (error) { + const response: chat.MessageResponse = { + succeed: false, + }; + + return new Observable((observer) => { + observer.next(response); + observer.complete(); + }); + } + } + + @GrpcMethod('ChatService', 'newInvitation') + newInvitation( + data: chat.NewInvitationRequest, + ): Observable { + try { + const payload = { + meeting: data.room, + }; + + this.socketGateway.emitTo({ + data: payload, + room: null, + event: SocketEvent.newInvitationSSC, + socketIds: data.ccus, + }); + + const response: chat.MessageResponse = { + succeed: true, + }; + + return new Observable((observer) => { + observer.next(response); + observer.complete(); + }); + } catch (error) { + const response: chat.MessageResponse = { + succeed: false, + }; + + return new Observable((observer) => { + observer.next(response); + observer.complete(); + }); + } + } } diff --git a/src/infrastructure/gateways/gateway.module.ts b/src/infrastructure/gateways/gateway.module.ts index e2fbbca..03c1b67 100644 --- a/src/infrastructure/gateways/gateway.module.ts +++ b/src/infrastructure/gateways/gateway.module.ts @@ -8,6 +8,8 @@ import { WebRTCModule } from 'src/infrastructure/services/sfu/webrtc.module'; import { EnvironmentConfigModule } from '../config/environment/environment.module'; import { AuthGrpcService } from '../services/auth/auth.service'; import { MessageBroker } from '../services/message-broker/message-broker'; +import { WhiteBoardGrpcService } from '../services/meeting/white-board.service'; +import { RecordGrpcService } from '../services/meeting/record.service'; @Module({ imports: [ @@ -27,6 +29,18 @@ import { MessageBroker } from '../services/message-broker/message-broker'; useFactory: (clientProxy: ClientGrpc) => new MeetingGrpcService(clientProxy), }, + { + provide: WhiteBoardGrpcService, + inject: [ClientProxyModule.whiteBoardClientProxy], + useFactory: (clientProxy: ClientGrpc) => + new WhiteBoardGrpcService(clientProxy), + }, + { + provide: RecordGrpcService, + inject: [ClientProxyModule.recordClientProxy], + useFactory: (clientProxy: ClientGrpc) => + new RecordGrpcService(clientProxy), + }, MessageBroker, SocketGateway, MeetingGateway, diff --git a/src/infrastructure/gateways/meeting/dtos/clean_board.dto.ts b/src/infrastructure/gateways/meeting/dtos/clean_board.dto.ts new file mode 100644 index 0000000..574e8a4 --- /dev/null +++ b/src/infrastructure/gateways/meeting/dtos/clean_board.dto.ts @@ -0,0 +1,6 @@ +import { ApiProperty } from '@nestjs/swagger'; + +export class CleanWhiteBoardDto { + @ApiProperty({ type: String }) + roomId: string; +} diff --git a/src/infrastructure/gateways/meeting/dtos/start_white_board.dto.ts b/src/infrastructure/gateways/meeting/dtos/start_white_board.dto.ts new file mode 100644 index 0000000..d344802 --- /dev/null +++ b/src/infrastructure/gateways/meeting/dtos/start_white_board.dto.ts @@ -0,0 +1,6 @@ +import { ApiProperty } from '@nestjs/swagger'; + +export class StartWhiteBoardDto { + @ApiProperty({ type: String }) + roomId: string; +} diff --git a/src/infrastructure/gateways/meeting/dtos/update_white_board.dto.ts b/src/infrastructure/gateways/meeting/dtos/update_white_board.dto.ts new file mode 100644 index 0000000..853f0fe --- /dev/null +++ b/src/infrastructure/gateways/meeting/dtos/update_white_board.dto.ts @@ -0,0 +1,28 @@ +import { ApiProperty } from '@nestjs/swagger'; +import { WhiteBoardAction } from 'src/domain/models/white-board-action'; + +export interface PaintModel { + color: string; + offsets: OffsetModel[]; + width: number; +} + +export interface OffsetModel { + dx: number; + dy: number; +} + +export class UpdateWhiteBoardDto { + @ApiProperty({ type: String }) + roomId: string; + + @ApiProperty({ + type: 'enum', + enum: WhiteBoardAction, + default: WhiteBoardAction.add, + }) + action: string; + + @ApiProperty({ type: 'simple-json' }) + paints: PaintModel[]; +} diff --git a/src/infrastructure/gateways/meeting/meeting.gateway.ts b/src/infrastructure/gateways/meeting/meeting.gateway.ts index 7c29517..1041896 100644 --- a/src/infrastructure/gateways/meeting/meeting.gateway.ts +++ b/src/infrastructure/gateways/meeting/meeting.gateway.ts @@ -23,7 +23,9 @@ import { MessageBroker } from 'src/infrastructure/services/message-broker/messag import { EnvironmentConfigService } from 'src/infrastructure/config/environment/environments'; import { SetSubscribeSubtitleDto } from './dtos/set_subscribe_subtitle.dto'; import { PublisherRenegotiationDto } from './dtos/publisher_renegotiation.dto'; -import { SubscriberRenegotiationDto } from './dtos/subscriber_renegotiation.dto'; +import { StartWhiteBoardDto } from './dtos/start_white_board.dto'; +import { UpdateWhiteBoardDto } from './dtos/update_white_board.dto'; +import { CleanWhiteBoardDto } from './dtos/clean_board.dto'; @WebSocketGateway() export class MeetingGateway { @@ -347,6 +349,34 @@ export class MeetingGateway { }); } + @SubscribeMessage(SocketEvent.startWhiteBoardCSS) + handleStartWhiteBoard( + client: ISocketClient, + payload: StartWhiteBoardDto, + ): any { + client.join(this._getWhiteBoardRoom(payload.roomId)); + } + + @SubscribeMessage(SocketEvent.updateWhiteBoardCSS) + handleUpdateWhiteBoard( + client: ISocketClient, + payload: UpdateWhiteBoardDto, + ): any { + client + .to(this._getWhiteBoardRoom(payload.roomId)) + .emit(SocketEvent.updateWhiteBoardSSC, payload); + } + + @SubscribeMessage(SocketEvent.cleanWhiteBoardCSS) + handleCleanWhiteBoard( + client: ISocketClient, + payload: CleanWhiteBoardDto, + ): any { + client + .to(this._getWhiteBoardRoom(payload.roomId)) + .to(SocketEvent.cleanWhiteBoardSSC); + } + @SubscribeMessage(SocketEvent.setSubscribeSubtitleCSS) handleSetSubscribeSubtitle( client: ISocketClient, @@ -386,4 +416,8 @@ export class MeetingGateway { this.logger.debug(`Update leave room in grpc: ${succeed}`); } } + + private _getWhiteBoardRoom(roomId: string): string { + return `white-board-${roomId}`; + } } diff --git a/src/infrastructure/gateways/socket/socket.gateway.ts b/src/infrastructure/gateways/socket/socket.gateway.ts index bfcb898..6247cea 100644 --- a/src/infrastructure/gateways/socket/socket.gateway.ts +++ b/src/infrastructure/gateways/socket/socket.gateway.ts @@ -133,10 +133,12 @@ export class SocketGateway }: { data: any; event: string; - room: string; + room: string | null; socketIds: string[]; }) { - this.server.to(room).emit(event, data); + if (room) { + this.server.to(room).emit(event, data); + } if (socketIds) { this.server.to(socketIds).emit(event, data); diff --git a/src/infrastructure/services/meeting/record.service.ts b/src/infrastructure/services/meeting/record.service.ts new file mode 100644 index 0000000..5fad393 --- /dev/null +++ b/src/infrastructure/services/meeting/record.service.ts @@ -0,0 +1,154 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { + BehaviorSubject, + catchError, + interval, + lastValueFrom, + map, + retry, + Subject, + Subscription, + switchMap, + tap, + throwError, + timeout, +} from 'rxjs'; +import { ClientGrpc } from '@nestjs/microservices'; +import { ClientService } from 'src/domain/models/client-service'; +import { recordtrack } from 'waterbus-proto'; +import { Status } from '@grpc/grpc-js/build/src/constants'; + +@Injectable() +export class RecordGrpcService implements OnModuleInit { + private readonly logger: Logger; + private recordService: recordtrack.RecordService; + private $connectionSubject: BehaviorSubject; + private isConnected: boolean; + private $reconnect: Subscription; + + constructor(private readonly recordClientProxy: ClientGrpc) { + this.logger = new Logger(RecordGrpcService.name); + } + + onModuleInit() { + this.connect(); + this.$connectionSubject = new BehaviorSubject(false); + this.$connectionSubject.subscribe({ + next: (status) => { + this.isConnected = status; + if (!status) { + this.$reconnect = interval(5000) + .pipe() + .subscribe({ + next: () => { + this.logger.log('Retry to connect...'); + this.connect(); + this.checkConnection(); + }, + }); + } else { + this.$reconnect.unsubscribe(); + } + }, + }); + } + + connect(): void { + this.recordService = new ClientService( + this.recordClientProxy, + 'RecordService', + ).getInstance(); + } + + private checkConnection(): void { + this.recordService + .ping({ message: 'ping' }) + .pipe(timeout(500), retry(3)) + .subscribe({ + next: (result) => { + this.logger.log('Connected'); + const status = result?.message === 'ping'; + if (this.isConnected !== status) { + this.$connectionSubject.next(status); + } + }, + error: () => { + if (this.isConnected) this.$connectionSubject.next(false); + }, + }); + } + + async startRecord( + data: recordtrack.StartRecordRequest, + ): Promise { + const dataSubject = new Subject(); + this.$connectionSubject + .pipe( + switchMap((isConnected) => { + if (isConnected) { + return this.recordService.startRecord(data).pipe(timeout(5000)); + } else + return throwError(() => ({ + code: Status.UNAVAILABLE, + message: 'The service is currently unavailable', + })); + }), + catchError((error) => { + if ( + (error?.code === Status.UNAVAILABLE || + error?.name === 'TimeoutError') && + this.isConnected + ) + this.$connectionSubject.next(false); + return throwError(() => error); + }), + tap((data) => dataSubject.next(data)), + tap(() => dataSubject.complete()), + ) + .subscribe({ + error: (err) => dataSubject.error(err), + }); + try { + return await lastValueFrom(dataSubject.pipe(map((response) => response))); + } catch (error) { + this.logger.error(error.toString()); + } + } + + async stopRecord( + data: recordtrack.StopRecordRequest, + ): Promise { + const dataSubject = new Subject(); + this.$connectionSubject + .pipe( + switchMap((isConnected) => { + if (isConnected) { + return this.recordService.stopRecord(data).pipe(timeout(5000)); + } else + return throwError(() => ({ + code: Status.UNAVAILABLE, + message: 'The service is currently unavailable', + })); + }), + catchError((error) => { + if ( + (error?.code === Status.UNAVAILABLE || + error?.name === 'TimeoutError') && + this.isConnected + ) + this.$connectionSubject.next(false); + return throwError(() => error); + }), + tap((data) => dataSubject.next(data)), + tap(() => dataSubject.complete()), + ) + .subscribe({ + error: (err) => dataSubject.error(err), + }); + try { + return await lastValueFrom(dataSubject.pipe(map((response) => response))); + } catch (error) { + this.logger.error(error.toString()); + } + } +} diff --git a/src/infrastructure/services/meeting/white-board.service.ts b/src/infrastructure/services/meeting/white-board.service.ts new file mode 100644 index 0000000..f919f7e --- /dev/null +++ b/src/infrastructure/services/meeting/white-board.service.ts @@ -0,0 +1,154 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { + BehaviorSubject, + catchError, + interval, + lastValueFrom, + map, + retry, + Subject, + Subscription, + switchMap, + tap, + throwError, + timeout, +} from 'rxjs'; +import { ClientGrpc } from '@nestjs/microservices'; +import { ClientService } from 'src/domain/models/client-service'; +import { whiteboard } from 'waterbus-proto'; +import { Status } from '@grpc/grpc-js/build/src/constants'; + +@Injectable() +export class WhiteBoardGrpcService implements OnModuleInit { + private readonly logger: Logger; + private whiteBoardService: whiteboard.WhiteBoardService; + private $connectionSubject: BehaviorSubject; + private isConnected: boolean; + private $reconnect: Subscription; + + constructor(private readonly whiteBoardClientProxy: ClientGrpc) { + this.logger = new Logger(WhiteBoardGrpcService.name); + } + + onModuleInit() { + this.connect(); + this.$connectionSubject = new BehaviorSubject(false); + this.$connectionSubject.subscribe({ + next: (status) => { + this.isConnected = status; + if (!status) { + this.$reconnect = interval(5000) + .pipe() + .subscribe({ + next: () => { + this.logger.log('Retry to connect...'); + this.connect(); + this.checkConnection(); + }, + }); + } else { + this.$reconnect.unsubscribe(); + } + }, + }); + } + + connect(): void { + this.whiteBoardService = new ClientService( + this.whiteBoardClientProxy, + 'WhiteBoardService', + ).getInstance(); + } + + private checkConnection(): void { + this.whiteBoardService + .ping({ message: 'ping' }) + .pipe(timeout(500), retry(3)) + .subscribe({ + next: (result) => { + this.logger.log('Connected'); + const status = result?.message === 'ping'; + if (this.isConnected !== status) { + this.$connectionSubject.next(status); + } + }, + error: () => { + if (this.isConnected) this.$connectionSubject.next(false); + }, + }); + } + + async getBoard( + data: whiteboard.GetWhiteBoardRequest, + ): Promise { + const dataSubject = new Subject(); + this.$connectionSubject + .pipe( + switchMap((isConnected) => { + if (isConnected) { + return this.whiteBoardService.getBoard(data).pipe(timeout(5000)); + } else + return throwError(() => ({ + code: Status.UNAVAILABLE, + message: 'The service is currently unavailable', + })); + }), + catchError((error) => { + if ( + (error?.code === Status.UNAVAILABLE || + error?.name === 'TimeoutError') && + this.isConnected + ) + this.$connectionSubject.next(false); + return throwError(() => error); + }), + tap((data) => dataSubject.next(data)), + tap(() => dataSubject.complete()), + ) + .subscribe({ + error: (err) => dataSubject.error(err), + }); + try { + return await lastValueFrom(dataSubject.pipe(map((response) => response))); + } catch (error) { + this.logger.error(error.toString()); + } + } + + async updateBoard( + data: whiteboard.UpdateWhiteBoardRequest, + ): Promise { + const dataSubject = new Subject(); + this.$connectionSubject + .pipe( + switchMap((isConnected) => { + if (isConnected) { + return this.whiteBoardService.updateBoard(data).pipe(timeout(5000)); + } else + return throwError(() => ({ + code: Status.UNAVAILABLE, + message: 'The service is currently unavailable', + })); + }), + catchError((error) => { + if ( + (error?.code === Status.UNAVAILABLE || + error?.name === 'TimeoutError') && + this.isConnected + ) + this.$connectionSubject.next(false); + return throwError(() => error); + }), + tap((data) => dataSubject.next(data)), + tap(() => dataSubject.complete()), + ) + .subscribe({ + error: (err) => dataSubject.error(err), + }); + try { + return await lastValueFrom(dataSubject.pipe(map((response) => response))); + } catch (error) { + this.logger.error(error.toString()); + } + } +} diff --git a/yarn.lock b/yarn.lock index bda9379..ef85925 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7823,10 +7823,10 @@ watchpack@^2.4.0: glob-to-regexp "^0.4.1" graceful-fs "^4.1.2" -waterbus-proto@^1.1.5: - version "1.1.5" - resolved "https://registry.yarnpkg.com/waterbus-proto/-/waterbus-proto-1.1.5.tgz#5e95d06caa493c328c325e4b5857ad205a4e18d6" - integrity sha512-8OIPP/k1MeG2fixceQbKhGJk5Cff9dmjVWmzeSvYyy6jvM//WaXxGhpLZxnXOR6xESQe+9rlZGZcMF+w6Tqh8A== +waterbus-proto@^1.1.11: + version "1.1.11" + resolved "https://registry.yarnpkg.com/waterbus-proto/-/waterbus-proto-1.1.11.tgz#54d6ce0eade5def5601c2be5c9f015d7b636253b" + integrity sha512-/P8L370WlfdPPFTGPn0JBY99wDUcqS/T3beD5VU65j4N1TQV2oz8J6YzNILfH+9U38M8uO91lc1i1jOqDBspzQ== dependencies: "@grpc/grpc-js" "^1.9.3" rxjs "^7.8.1"