From 2cf7fd27427de8ca954e4143fc9c689a41f827ae Mon Sep 17 00:00:00 2001 From: lukasIO Date: Wed, 22 Jan 2025 16:28:02 +0100 Subject: [PATCH] Remove stream events and replace with topic callbacks (#1373) * remove stream events and replace with topic callbacks * update args order * update example for interop --- examples/demo/demo.ts | 74 +++++++++++++++++++++------------------- src/room/Room.ts | 67 ++++++++++++++++++++++++++---------- src/room/StreamReader.ts | 10 ++++++ src/room/events.ts | 3 -- 4 files changed, 97 insertions(+), 57 deletions(-) diff --git a/examples/demo/demo.ts b/examples/demo/demo.ts index 0518e76e19..3331cf97eb 100644 --- a/examples/demo/demo.ts +++ b/examples/demo/demo.ts @@ -78,7 +78,7 @@ const appActions = { const file = ($('file') as HTMLInputElement).files?.[0]!; currentRoom?.localParticipant.sendFile(file, { mimeType: file.type, - topic: 'test', + topic: 'welcome', onProgress: (progress) => console.log('sending file, progress', Math.ceil(progress * 100)), }); }, @@ -242,49 +242,51 @@ const appActions = { participant.identity }) to ${streamState.toString()}`, ); - }) - .on(RoomEvent.TextStreamReceived, async (reader, participant) => { - const info = reader.info; - if (info.size && info.topic === 'chat') { + }); + + room.setTextStreamHandler(async (reader, participant) => { + const info = reader.info; + if (info.size) { + handleChatMessage( + { + id: info.id, + timestamp: info.timestamp, + message: await reader.readAll(), + }, + room.getParticipantByIdentity(participant?.identity), + ); + } else { + for await (const msg of reader) { handleChatMessage( { id: info.id, timestamp: info.timestamp, - message: await reader.readAll(), + message: msg.collected, }, room.getParticipantByIdentity(participant?.identity), ); - } else { - for await (const msg of reader) { - handleChatMessage( - { - id: info.id, - timestamp: info.timestamp, - message: msg.collected, - }, - room.getParticipantByIdentity(participant?.identity), - ); - } - appendLog('text stream finished'); } - console.log('final info including close extensions', reader.info); - }) - .on(RoomEvent.ByteStreamReceived, async (reader, participant) => { - const info = reader.info; - - appendLog(`started to receive a file called "${info.name}" from ${participant?.identity}`); - reader.onProgress = (progress) => { - console.log(`"progress ${progress ? (progress * 100).toFixed(0) : 'undefined'}%`); - }; - const result = new Blob(await reader.readAll(), { type: info.mimeType }); - appendLog(`completely received file called "${info.name}" from ${participant?.identity}`); - const downloadLink = URL.createObjectURL(result); - const linkEl = document.createElement('a'); - linkEl.href = downloadLink; - linkEl.innerText = info.name; - linkEl.setAttribute('download', info.name); - document.body.append(linkEl); - }); + appendLog('text stream finished'); + } + console.log('final info including close extensions', reader.info); + }, 'chat'); + + room.setByteStreamHandler(async (reader, participant) => { + const info = reader.info; + + appendLog(`started to receive a file called "${info.name}" from ${participant?.identity}`); + reader.onProgress = (progress) => { + console.log(`"progress ${progress ? (progress * 100).toFixed(0) : 'undefined'}%`); + }; + const result = new Blob(await reader.readAll(), { type: info.mimeType }); + appendLog(`completely received file called "${info.name}" from ${participant?.identity}`); + const downloadLink = URL.createObjectURL(result); + const linkEl = document.createElement('a'); + linkEl.href = downloadLink; + linkEl.innerText = info.name; + linkEl.setAttribute('download', info.name); + document.body.append(linkEl); + }, 'welcome'); try { // read and set current key from input diff --git a/src/room/Room.ts b/src/room/Room.ts index 4c4e86b6b5..06aa4ea447 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -48,7 +48,12 @@ import { getBrowser } from '../utils/browserParser'; import DeviceManager from './DeviceManager'; import RTCEngine from './RTCEngine'; import { RegionUrlProvider } from './RegionUrlProvider'; -import { ByteStreamReader, TextStreamReader } from './StreamReader'; +import { + type ByteStreamHandler, + ByteStreamReader, + type TextStreamHandler, + TextStreamReader, +} from './StreamReader'; import { audioDefaults, publishDefaults, @@ -189,6 +194,14 @@ class Room extends (EventEmitter as new () => TypedEmitter) */ private transcriptionReceivedTimes: Map; + private byteStreamControllers = new Map>(); + + private textStreamControllers = new Map>(); + + private byteStreamHandlers = new Map(); + + private textStreamHandlers = new Map(); + /** * Creates a new Room, the primary construct for a LiveKit session. * @param options @@ -261,6 +274,22 @@ class Room extends (EventEmitter as new () => TypedEmitter) } } + setTextStreamHandler(callback: TextStreamHandler | undefined, topic: string = '') { + if (!callback) { + this.textStreamHandlers.delete(topic); + } else { + this.textStreamHandlers.set(topic, callback); + } + } + + setByteStreamHandler(callback: ByteStreamHandler | undefined, topic: string = '') { + if (!callback) { + this.byteStreamHandlers.delete(topic); + } else { + this.byteStreamHandlers.set(topic, callback); + } + } + /** * @experimental */ @@ -1607,14 +1636,15 @@ class Room extends (EventEmitter as new () => TypedEmitter) } }; - byteStreamControllers = new Map>(); - - textStreamControllers = new Map>(); - private async handleStreamHeader(streamHeader: DataStream_Header, participantIdentity: string) { if (streamHeader.contentHeader.case === 'byteHeader') { - if (this.listeners(RoomEvent.ByteStreamReceived).length === 0) { - this.log.debug('ignoring incoming file stream due to no listeners'); + const streamHandlerCallback = this.byteStreamHandlers.get(streamHeader.topic); + + if (!streamHandlerCallback) { + this.log.debug( + 'ignoring incoming byte stream due to no handler for topic', + streamHeader.topic, + ); return; } let streamController: ReadableStreamDefaultController; @@ -1637,15 +1667,20 @@ class Room extends (EventEmitter as new () => TypedEmitter) }); }, }); - - this.emit( - RoomEvent.ByteStreamReceived, + streamHandlerCallback( new ByteStreamReader(info, stream, bigIntToNumber(streamHeader.totalLength)), - { identity: participantIdentity }, + { + identity: participantIdentity, + }, ); } else if (streamHeader.contentHeader.case === 'textHeader') { - if (this.listeners(RoomEvent.TextStreamReceived).length === 0) { - this.log.debug('ignoring incoming text stream due to no listeners'); + const streamHandlerCallback = this.textStreamHandlers.get(streamHeader.topic); + + if (!streamHandlerCallback) { + this.log.debug( + 'ignoring incoming text stream due to no handler for topic', + streamHeader.topic, + ); return; } let streamController: ReadableStreamDefaultController; @@ -1668,9 +1703,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) }); }, }); - - this.emit( - RoomEvent.TextStreamReceived, + streamHandlerCallback( new TextStreamReader(info, stream, bigIntToNumber(streamHeader.totalLength)), { identity: participantIdentity }, ); @@ -2516,6 +2549,4 @@ export type RoomEventCallbacks = { chatMessage: (message: ChatMessage, participant?: RemoteParticipant | LocalParticipant) => void; localTrackSubscribed: (publication: LocalTrackPublication, participant: LocalParticipant) => void; metricsReceived: (metrics: MetricsBatch, participant?: Participant) => void; - byteStreamReceived: (reader: ByteStreamReader, participantInfo: { identity: string }) => void; - textStreamReceived: (reader: TextStreamReader, participantInfo: { identity: string }) => void; }; diff --git a/src/room/StreamReader.ts b/src/room/StreamReader.ts index a15805bf3a..5b24bd66dc 100644 --- a/src/room/StreamReader.ts +++ b/src/room/StreamReader.ts @@ -165,3 +165,13 @@ export class TextStreamReader extends BaseStreamReader { return latestString; } } + +export type ByteStreamHandler = ( + reader: ByteStreamReader, + participantInfo: { identity: string }, +) => void; + +export type TextStreamHandler = ( + reader: TextStreamReader, + participantInfo: { identity: string }, +) => void; diff --git a/src/room/events.ts b/src/room/events.ts index 58f3fb3110..38d9a6e09d 100644 --- a/src/room/events.ts +++ b/src/room/events.ts @@ -335,9 +335,6 @@ export enum RoomEvent { * fired when the client receives connection metrics from other participants */ MetricsReceived = 'metricsReceived', - - ByteStreamReceived = 'byteStreamReceived', - TextStreamReceived = 'textStreamReceived', } export enum ParticipantEvent {