Skip to content

Commit

Permalink
Merge branch 'lukas/file-send' of github.com:livekit/client-sdk-js in…
Browse files Browse the repository at this point in the history
…to lukas/file-send
  • Loading branch information
lukasIO committed Jan 22, 2025
2 parents e39445a + 2cf7fd2 commit b9d8031
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 57 deletions.
74 changes: 38 additions & 36 deletions examples/demo/demo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const appActions = {
const file = ($('file') as HTMLInputElement).files?.[0]!;
currentRoom?.localParticipant.sendFile(file, {
mimeType: file.type,
topic: 'test',
topic: 'welcome',
onProgress: (progress) => console.log('sending file, progress', Math.ceil(progress * 100)),
});
},
Expand Down Expand Up @@ -242,49 +242,51 @@ const appActions = {
participant.identity
}) to ${streamState.toString()}`,
);
})
.on(RoomEvent.TextStreamReceived, async (reader, participant) => {
const info = reader.info;
if (info.size && info.topic === 'chat') {
});

room.setTextStreamHandler(async (reader, participant) => {
const info = reader.info;
if (info.size) {
handleChatMessage(
{
id: info.id,
timestamp: info.timestamp,
message: await reader.readAll(),
},
room.getParticipantByIdentity(participant?.identity),
);
} else {
for await (const msg of reader) {
handleChatMessage(
{
id: info.id,
timestamp: info.timestamp,
message: await reader.readAll(),
message: msg.collected,
},
room.getParticipantByIdentity(participant?.identity),
);
} else {
for await (const msg of reader) {
handleChatMessage(
{
id: info.id,
timestamp: info.timestamp,
message: msg.collected,
},
room.getParticipantByIdentity(participant?.identity),
);
}
appendLog('text stream finished');
}
console.log('final info including close extensions', reader.info);
})
.on(RoomEvent.ByteStreamReceived, async (reader, participant) => {
const info = reader.info;

appendLog(`started to receive a file called "${info.name}" from ${participant?.identity}`);
reader.onProgress = (progress) => {
console.log(`"progress ${progress ? (progress * 100).toFixed(0) : 'undefined'}%`);
};
const result = new Blob(await reader.readAll(), { type: info.mimeType });
appendLog(`completely received file called "${info.name}" from ${participant?.identity}`);
const downloadLink = URL.createObjectURL(result);
const linkEl = document.createElement('a');
linkEl.href = downloadLink;
linkEl.innerText = info.name;
linkEl.setAttribute('download', info.name);
document.body.append(linkEl);
});
appendLog('text stream finished');
}
console.log('final info including close extensions', reader.info);
}, 'chat');

room.setByteStreamHandler(async (reader, participant) => {
const info = reader.info;

appendLog(`started to receive a file called "${info.name}" from ${participant?.identity}`);
reader.onProgress = (progress) => {
console.log(`"progress ${progress ? (progress * 100).toFixed(0) : 'undefined'}%`);
};
const result = new Blob(await reader.readAll(), { type: info.mimeType });
appendLog(`completely received file called "${info.name}" from ${participant?.identity}`);
const downloadLink = URL.createObjectURL(result);
const linkEl = document.createElement('a');
linkEl.href = downloadLink;
linkEl.innerText = info.name;
linkEl.setAttribute('download', info.name);
document.body.append(linkEl);
}, 'welcome');

try {
// read and set current key from input
Expand Down
67 changes: 49 additions & 18 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ import { getBrowser } from '../utils/browserParser';
import DeviceManager from './DeviceManager';
import RTCEngine from './RTCEngine';
import { RegionUrlProvider } from './RegionUrlProvider';
import { ByteStreamReader, TextStreamReader } from './StreamReader';
import {
type ByteStreamHandler,
ByteStreamReader,
type TextStreamHandler,
TextStreamReader,
} from './StreamReader';
import {
audioDefaults,
publishDefaults,
Expand Down Expand Up @@ -189,6 +194,14 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
*/
private transcriptionReceivedTimes: Map<string, number>;

private byteStreamControllers = new Map<string, StreamController<DataStream_Chunk>>();

private textStreamControllers = new Map<string, StreamController<DataStream_Chunk>>();

private byteStreamHandlers = new Map<string, ByteStreamHandler>();

private textStreamHandlers = new Map<string, TextStreamHandler>();

/**
* Creates a new Room, the primary construct for a LiveKit session.
* @param options
Expand Down Expand Up @@ -261,6 +274,22 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}
}

setTextStreamHandler(callback: TextStreamHandler | undefined, topic: string = '') {
if (!callback) {
this.textStreamHandlers.delete(topic);
} else {
this.textStreamHandlers.set(topic, callback);
}
}

setByteStreamHandler(callback: ByteStreamHandler | undefined, topic: string = '') {
if (!callback) {
this.byteStreamHandlers.delete(topic);
} else {
this.byteStreamHandlers.set(topic, callback);
}
}

/**
* @experimental
*/
Expand Down Expand Up @@ -1611,14 +1640,15 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}
};

byteStreamControllers = new Map<string, StreamController<DataStream_Chunk>>();

textStreamControllers = new Map<string, StreamController<DataStream_Chunk>>();

private async handleStreamHeader(streamHeader: DataStream_Header, participantIdentity: string) {
if (streamHeader.contentHeader.case === 'byteHeader') {
if (this.listeners(RoomEvent.ByteStreamReceived).length === 0) {
this.log.debug('ignoring incoming file stream due to no listeners');
const streamHandlerCallback = this.byteStreamHandlers.get(streamHeader.topic);

if (!streamHandlerCallback) {
this.log.debug(
'ignoring incoming byte stream due to no handler for topic',
streamHeader.topic,
);
return;
}
let streamController: ReadableStreamDefaultController<DataStream_Chunk>;
Expand All @@ -1641,15 +1671,20 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
});
},
});

this.emit(
RoomEvent.ByteStreamReceived,
streamHandlerCallback(
new ByteStreamReader(info, stream, bigIntToNumber(streamHeader.totalLength)),
{ identity: participantIdentity },
{
identity: participantIdentity,
},
);
} else if (streamHeader.contentHeader.case === 'textHeader') {
if (this.listeners(RoomEvent.TextStreamReceived).length === 0) {
this.log.debug('ignoring incoming text stream due to no listeners');
const streamHandlerCallback = this.textStreamHandlers.get(streamHeader.topic);

if (!streamHandlerCallback) {
this.log.debug(
'ignoring incoming text stream due to no handler for topic',
streamHeader.topic,
);
return;
}
let streamController: ReadableStreamDefaultController<DataStream_Chunk>;
Expand All @@ -1672,9 +1707,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
});
},
});

this.emit(
RoomEvent.TextStreamReceived,
streamHandlerCallback(
new TextStreamReader(info, stream, bigIntToNumber(streamHeader.totalLength)),
{ identity: participantIdentity },
);
Expand Down Expand Up @@ -2520,6 +2553,4 @@ export type RoomEventCallbacks = {
chatMessage: (message: ChatMessage, participant?: RemoteParticipant | LocalParticipant) => void;
localTrackSubscribed: (publication: LocalTrackPublication, participant: LocalParticipant) => void;
metricsReceived: (metrics: MetricsBatch, participant?: Participant) => void;
byteStreamReceived: (reader: ByteStreamReader, participantInfo: { identity: string }) => void;
textStreamReceived: (reader: TextStreamReader, participantInfo: { identity: string }) => void;
};
10 changes: 10 additions & 0 deletions src/room/StreamReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,13 @@ export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
return latestString;
}
}

export type ByteStreamHandler = (
reader: ByteStreamReader,
participantInfo: { identity: string },
) => void;

export type TextStreamHandler = (
reader: TextStreamReader,
participantInfo: { identity: string },
) => void;
3 changes: 0 additions & 3 deletions src/room/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,6 @@ export enum RoomEvent {
* fired when the client receives connection metrics from other participants
*/
MetricsReceived = 'metricsReceived',

ByteStreamReceived = 'byteStreamReceived',
TextStreamReceived = 'textStreamReceived',
}

export enum ParticipantEvent {
Expand Down

0 comments on commit b9d8031

Please sign in to comment.