From 1ebdf25f7ac659960f1f54f2a32cf6d1c0111509 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Wed, 22 Jan 2025 18:56:40 +0800 Subject: [PATCH 01/15] add RoomInput --- .../livekit/agents/pipeline/room_io.py | 176 ++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 livekit-agents/livekit/agents/pipeline/room_io.py diff --git a/livekit-agents/livekit/agents/pipeline/room_io.py b/livekit-agents/livekit/agents/pipeline/room_io.py new file mode 100644 index 000000000..a1e749bd9 --- /dev/null +++ b/livekit-agents/livekit/agents/pipeline/room_io.py @@ -0,0 +1,176 @@ +from __future__ import annotations + +import asyncio +from typing import Optional + +from livekit import rtc + +from ..log import logger +from ..utils import aio +from .io import AudioStream, VideoStream + + +class RoomInput: + """Creates video and audio streams from a remote participant in a LiveKit room""" + + class _RemoteTrackStreamer: + """Manages streaming from a remote track to a channel""" + + def __init__( + self, + source: rtc.TrackSource.ValueType, + enabled: bool = False, + sample_rate: int | None = None, + ) -> None: + self.source = source + self.enabled = enabled + self.sample_rate = sample_rate + + self.track: rtc.RemoteTrack | None = None + self.task: asyncio.Task | None = None + self._data_ch: aio.Chan[rtc.AudioFrame | rtc.VideoFrame] | None = None + + if enabled: + self._data_ch = aio.Chan() + + @property + def data_ch(self) -> aio.Chan | None: + return self._data_ch + + def setup(self, track: rtc.RemoteTrack) -> None: + """Set up streaming for a new track""" + if track == self.track: + return + + if self.task is not None: + self.task.cancel() + + assert self._data_ch is not None + self.track = track + stream = ( + rtc.AudioStream(track, sample_rate=self.sample_rate) + if self.source == rtc.TrackSource.SOURCE_MICROPHONE + else rtc.VideoStream(track) + ) + self.task = asyncio.create_task(self._stream_frames(stream)) + + async def _stream_frames( + self, stream: rtc.AudioStream | rtc.VideoStream + ) -> None: + assert self._data_ch is not None + async for event in stream: + self._data_ch.send_nowait(event.frame) + + def __init__( + self, + room: rtc.Room, + participant_identity: Optional[str] = None, + *, + audio_enabled: bool = True, + video_enabled: bool = False, + ) -> None: + """ + Args: + room: The LiveKit room to get streams from + participant_identity: Optional identity of the participant to get streams from. + If None, will use the first participant that joins. + audio_enabled: Whether to enable audio input + video_enabled: Whether to enable video input + """ + self._room = room + self._expected_identity = participant_identity + self._participant: rtc.RemoteParticipant | None = None + self._closed = False + + # set up track streamers + self._audio_streamer = self._RemoteTrackStreamer( + rtc.TrackSource.SOURCE_MICROPHONE, enabled=audio_enabled, sample_rate=16000 + ) + self._video_streamer = self._RemoteTrackStreamer( + rtc.TrackSource.SOURCE_CAMERA, enabled=video_enabled + ) + + self._participant_ready = asyncio.Event() + self._room.on("participant_connected", self._on_participant_connected) + self._room.on("track_published", self._subscribe_to_tracks) + self._room.on("track_subscribed", self._subscribe_to_tracks) + + # try to find participant + if self._expected_identity is not None: + participant = self._room.remote_participants.get(self._expected_identity) + if participant is not None: + self._link_participant(participant) + else: + for participant in self._room.remote_participants.values(): + self._link_participant(participant) + if self._participant: + break + + async def wait_for_participant(self) -> rtc.RemoteParticipant: + await self._participant_ready.wait() + assert self._participant is not None + return self._participant + + @property + def audio(self) -> AudioStream | None: + return self._audio_streamer.data_ch + + @property + def video(self) -> VideoStream | None: + return self._video_streamer.data_ch + + def _link_participant(self, participant: rtc.RemoteParticipant) -> None: + if ( + self._expected_identity is not None + and participant.identity != self._expected_identity + ): + return + + self._participant = participant + self._participant_ready.set() + + # set up tracks + self._subscribe_to_tracks() + + def _on_participant_connected(self, participant: rtc.RemoteParticipant) -> None: + if self._participant is not None: + return + self._link_participant(participant) + + def _subscribe_to_tracks(self, *args, **kwargs) -> None: + if self._participant is None: + return + + for publication in self._participant.track_publications.values(): + # skip tracks we don't care about + streamer = None + if publication.source == rtc.TrackSource.SOURCE_MICROPHONE: + streamer = self._audio_streamer + elif publication.source == rtc.TrackSource.SOURCE_CAMERA: + streamer = self._video_streamer + + if streamer is None or not streamer.enabled: + continue + + # subscribe and setup streaming + if not publication.subscribed: + publication.set_subscribed(True) + + track: rtc.RemoteTrack | None = publication.track + if track is not None: + streamer.setup(track) + + async def aclose(self) -> None: + if self._closed: + raise RuntimeError("RoomInput already closed") + + self._closed = True + self._room.off("participant_connected", self._on_participant_connected) + self._room.off("track_published", self._subscribe_to_tracks) + self._room.off("track_subscribed", self._subscribe_to_tracks) + self._participant = None + + # Cancel stream tasks + for streamer in [self._audio_streamer, self._video_streamer]: + if streamer.task is not None: + await aio.graceful_cancel(streamer.task) From 33cb115508366fdffce7cb4d43cc0124737f107f Mon Sep 17 00:00:00 2001 From: Long Chen Date: Wed, 22 Jan 2025 20:06:07 +0800 Subject: [PATCH 02/15] add room audio sink --- .../livekit/agents/pipeline/room_io.py | 87 ++++++++++++++++++- 1 file changed, 85 insertions(+), 2 deletions(-) diff --git a/livekit-agents/livekit/agents/pipeline/room_io.py b/livekit-agents/livekit/agents/pipeline/room_io.py index a1e749bd9..3414f7b5e 100644 --- a/livekit-agents/livekit/agents/pipeline/room_io.py +++ b/livekit-agents/livekit/agents/pipeline/room_io.py @@ -5,9 +5,8 @@ from livekit import rtc -from ..log import logger from ..utils import aio -from .io import AudioStream, VideoStream +from .io import AudioSink, AudioStream, VideoStream class RoomInput: @@ -174,3 +173,87 @@ async def aclose(self) -> None: for streamer in [self._audio_streamer, self._video_streamer]: if streamer.task is not None: await aio.graceful_cancel(streamer.task) + + +class RoomAudioSink(AudioSink): + """AudioSink implementation that publishes audio to a LiveKit room using a LocalAudioTrack""" + + def __init__( + self, room: rtc.Room, sample_rate: int = 24000, num_channels: int = 1 + ) -> None: + """Initialize the RoomAudioSink + + Args: + room: The LiveKit room to publish audio to + sample_rate: Sample rate of the audio in Hz + num_channels: Number of audio channels + """ + super().__init__(sample_rate=sample_rate) + self._room = room + self._num_channels = num_channels + + # Create audio source and track + self._audio_source = rtc.AudioSource( + sample_rate=sample_rate, num_channels=num_channels + ) + self._track = rtc.LocalAudioTrack.create_audio_track( + "assistant_voice", self._audio_source + ) + + self._publication: rtc.LocalTrackPublication | None = None + self._publish_task: asyncio.Task | None = None + self._pushed_duration: float | None = None + + async def start(self) -> None: + """Start publishing the audio track to the room""" + if self._publication: + return + + self._publication = await self._room.local_participant.publish_track( + self._track, + rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE), + ) + + # is this necessary? + await self._publication.wait_for_subscription() + + async def capture_frame(self, frame: rtc.AudioFrame) -> None: + """Capture an audio frame and publish it to the room + + Args: + frame: The audio frame to publish + """ + await super().capture_frame(frame) + + if self._pushed_duration is None: + self._pushed_duration = 0.0 + + self._pushed_duration += frame.duration + await self._audio_source.capture_frame(frame) + + def flush(self) -> None: + """Flush the current audio segment and notify when complete""" + super().flush() + + if self._pushed_duration is not None: + # Notify that playback finished + self.on_playback_finished( + playback_position=self._pushed_duration, interrupted=False + ) + self._pushed_duration = None + + def clear_buffer(self) -> None: + """Clear the audio buffer immediately""" + # Clear the buffer + self._audio_source.clear_queue() + + if self._pushed_duration is not None: + # Notify that playback was interrupted + self.on_playback_finished( + playback_position=self._pushed_duration, interrupted=True + ) + self._pushed_duration = None + + async def aclose(self) -> None: + """Clean up resources""" + await self._audio_source.aclose() From 89dced241c4743bcb561f8e873bb48793e3bbfe6 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Wed, 22 Jan 2025 22:33:24 +0800 Subject: [PATCH 03/15] wip testing the agent --- livekit-agents/livekit/agents/llm/chat_context.py | 8 +++++++- .../livekit/agents/pipeline/speech_handle.py | 13 ++++++++++++- .../livekit/plugins/openai/__init__.py | 4 ++-- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/livekit-agents/livekit/agents/llm/chat_context.py b/livekit-agents/livekit/agents/llm/chat_context.py index bf9fa2c7b..711df46fb 100644 --- a/livekit-agents/livekit/agents/llm/chat_context.py +++ b/livekit-agents/livekit/agents/llm/chat_context.py @@ -21,7 +21,7 @@ ) from livekit import rtc -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, ConfigDict from typing_extensions import TypeAlias from .. import utils @@ -81,12 +81,18 @@ class ImageContent(BaseModel): Currently only supported by OpenAI (see https://platform.openai.com/docs/guides/vision?lang=node#low-or-high-fidelity-image-understanding) """ + # temporary fix for pydantic + model_config = ConfigDict(arbitrary_types_allowed=True) + class AudioContent(BaseModel): type: Literal["audio_content"] = Field(default="audio_content") frame: list[rtc.AudioFrame] transcript: Optional[str] = None + # temporary fix for pydantic before rtc.AudioFrame is supported + model_config = ConfigDict(arbitrary_types_allowed=True) + class ChatMessage(BaseModel): id: str = Field(default_factory=lambda: utils.shortuuid("item_")) diff --git a/livekit-agents/livekit/agents/pipeline/speech_handle.py b/livekit-agents/livekit/agents/pipeline/speech_handle.py index bd3971291..8ae63ece0 100644 --- a/livekit-agents/livekit/agents/pipeline/speech_handle.py +++ b/livekit-agents/livekit/agents/pipeline/speech_handle.py @@ -79,6 +79,17 @@ def _mark_playout_done(self) -> None: self._playout_done_fut.set_result(None) async def wait_until_interrupted(self, aw: list[Awaitable]) -> None: + temp_tasks = [] + tasks = [] + for task in aw: + if not isinstance(task, asyncio.Task): + task = asyncio.create_task(task) + temp_tasks.append(task) + tasks.append(task) + await asyncio.wait( - [*aw, self._interrupt_fut], return_when=asyncio.FIRST_COMPLETED + [*tasks, self._interrupt_fut], return_when=asyncio.FIRST_COMPLETED ) + for task in temp_tasks: + if not task.done(): + task.cancel() diff --git a/livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/__init__.py b/livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/__init__.py index 1a6b7c00d..ae870a241 100644 --- a/livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/__init__.py +++ b/livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/__init__.py @@ -13,7 +13,7 @@ # limitations under the License. -from . import beta, realtime +from . import realtime from .embeddings import EmbeddingData, create_embeddings from .llm import LLM, LLMStream from .models import TTSModels, TTSVoices, WhisperModels @@ -27,7 +27,7 @@ "LLM", "LLMStream", "WhisperModels", - "beta", + # "beta", "TTSModels", "TTSVoices", "create_embeddings", From c26e5e500f1ebfb5556efdcb100e57d46bc0b60e Mon Sep 17 00:00:00 2001 From: Long Chen Date: Thu, 23 Jan 2025 11:36:24 +0800 Subject: [PATCH 04/15] add room io example --- examples/roomio_worker.py | 55 +++++ livekit-agents/livekit/agents/cli/cli.py | 2 +- .../livekit/agents/pipeline/room_io.py | 226 +++++++++--------- 3 files changed, 170 insertions(+), 113 deletions(-) create mode 100644 examples/roomio_worker.py diff --git a/examples/roomio_worker.py b/examples/roomio_worker.py new file mode 100644 index 000000000..2cbdb3d00 --- /dev/null +++ b/examples/roomio_worker.py @@ -0,0 +1,55 @@ +import logging + +from dotenv import load_dotenv +from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, WorkerType, cli +from livekit.agents.pipeline import AgentTask, ChatCLI, PipelineAgent +from livekit.agents.pipeline.room_io import RoomAudioSink, RoomInput +from livekit.agents.pipeline.io import PlaybackFinishedEvent +from livekit.plugins import cartesia, deepgram, openai, silero + +logger = logging.getLogger("my-worker") +logger.setLevel(logging.INFO) + +load_dotenv() + + +async def entrypoint(ctx: JobContext): + await ctx.connect() + + agent = PipelineAgent( + task=AgentTask( + instructions="Talk to me!", + llm=openai.realtime.RealtimeModel(), + ) + ) + agent.start() + + # # start a chat inside the CLI + # chat_cli = ChatCLI(agent) + # await chat_cli.run() + + room_input = RoomInput(ctx.room, audio_enabled=True, video_enabled=False) + audio_output = RoomAudioSink(ctx.room, sample_rate=24000, num_channels=1) + + agent.input.audio = room_input.audio + agent.output.audio = audio_output + + # TODO: the interrupted flag is not set correctly + @audio_output.on("playback_finished") + def on_playback_finished(ev: PlaybackFinishedEvent) -> None: + logger.info( + "playback_finished", + extra={ + "playback_position": ev.playback_position, + "interrupted": ev.interrupted, + }, + ) + + await room_input.wait_for_participant() + await audio_output.start() + + +if __name__ == "__main__": + # WorkerType.ROOM is the default worker type which will create an agent for every room. + # You can also use WorkerType.PUBLISHER to create a single agent for all participants that publish a track. + cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, worker_type=WorkerType.ROOM)) diff --git a/livekit-agents/livekit/agents/cli/cli.py b/livekit-agents/livekit/agents/cli/cli.py index a29ef17e8..afee008b2 100644 --- a/livekit-agents/livekit/agents/cli/cli.py +++ b/livekit-agents/livekit/agents/cli/cli.py @@ -116,7 +116,7 @@ def dev( asyncio_debug=asyncio_debug, watch=watch, drain_timeout=0, - register=False, + register=True, ) _run_dev(args) diff --git a/livekit-agents/livekit/agents/pipeline/room_io.py b/livekit-agents/livekit/agents/pipeline/room_io.py index 3414f7b5e..b33c15a3f 100644 --- a/livekit-agents/livekit/agents/pipeline/room_io.py +++ b/livekit-agents/livekit/agents/pipeline/room_io.py @@ -1,7 +1,8 @@ from __future__ import annotations import asyncio -from typing import Optional +from functools import partial +from typing import Callable, Optional from livekit import rtc @@ -9,24 +10,99 @@ from .io import AudioSink, AudioStream, VideoStream +class RoomAudioSink(AudioSink): + """AudioSink implementation that publishes audio to a LiveKit room""" + + def __init__( + self, room: rtc.Room, *, sample_rate: int = 24000, num_channels: int = 1 + ) -> None: + """Initialize the RoomAudioSink + + Args: + room: The LiveKit room to publish audio to + sample_rate: Sample rate of the audio in Hz + num_channels: Number of audio channels + """ + super().__init__(sample_rate=sample_rate) + self._room = room + + # Create audio source and track + self._audio_source = rtc.AudioSource( + sample_rate=sample_rate, num_channels=num_channels + ) + self._track = rtc.LocalAudioTrack.create_audio_track( + "assistant_voice", self._audio_source + ) + + self._publication: rtc.LocalTrackPublication | None = None + self._publish_task: asyncio.Task | None = None + self._pushed_duration: float | None = None + + async def start(self) -> None: + """Start publishing the audio track to the room""" + if self._publication: + return + + self._publication = await self._room.local_participant.publish_track( + self._track, + rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE), + ) + + # is this necessary? + await self._publication.wait_for_subscription() + + async def capture_frame(self, frame: rtc.AudioFrame) -> None: + """Capture an audio frame and publish it to the room""" + await super().capture_frame(frame) + + if self._pushed_duration is None: + self._pushed_duration = 0.0 + + self._pushed_duration += frame.duration + await self._audio_source.capture_frame(frame) + + def flush(self) -> None: + """Flush the current audio segment and notify when complete""" + super().flush() + + if self._pushed_duration is not None: + # Notify that playback finished + self.on_playback_finished( + playback_position=self._pushed_duration, interrupted=False + ) + self._pushed_duration = None + + def clear_buffer(self) -> None: + """Clear the audio buffer immediately""" + # Clear the buffer + self._audio_source.clear_queue() + + if self._pushed_duration is not None: + # Notify that playback was interrupted + self.on_playback_finished( + playback_position=self._pushed_duration, interrupted=True + ) + self._pushed_duration = None + + class RoomInput: """Creates video and audio streams from a remote participant in a LiveKit room""" - class _RemoteTrackStreamer: - """Manages streaming from a remote track to a channel""" + class _RemoteTrackHandler: + """Manages streaming from a remote track to a aio.Chan""" def __init__( self, - source: rtc.TrackSource.ValueType, + track_to_stream: Callable[ + [rtc.RemoteTrack], rtc.AudioStream | rtc.VideoStream + ], enabled: bool = False, - sample_rate: int | None = None, ) -> None: - self.source = source + self.track_to_stream = track_to_stream self.enabled = enabled - self.sample_rate = sample_rate self.track: rtc.RemoteTrack | None = None - self.task: asyncio.Task | None = None + self.stream_task: asyncio.Task | None = None self._data_ch: aio.Chan[rtc.AudioFrame | rtc.VideoFrame] | None = None if enabled: @@ -41,17 +117,13 @@ def setup(self, track: rtc.RemoteTrack) -> None: if track == self.track: return - if self.task is not None: - self.task.cancel() + if self.stream_task is not None: + self.stream_task.cancel() assert self._data_ch is not None self.track = track - stream = ( - rtc.AudioStream(track, sample_rate=self.sample_rate) - if self.source == rtc.TrackSource.SOURCE_MICROPHONE - else rtc.VideoStream(track) - ) - self.task = asyncio.create_task(self._stream_frames(stream)) + stream = self.track_to_stream(track) + self.stream_task = asyncio.create_task(self._stream_frames(stream)) async def _stream_frames( self, stream: rtc.AudioStream | rtc.VideoStream @@ -67,6 +139,8 @@ def __init__( *, audio_enabled: bool = True, video_enabled: bool = False, + audio_sample_rate: int = 16000, + audio_num_channels: int = 1, ) -> None: """ Args: @@ -75,6 +149,8 @@ def __init__( If None, will use the first participant that joins. audio_enabled: Whether to enable audio input video_enabled: Whether to enable video input + audio_sample_rate: Sample rate of the audio in Hz + audio_num_channels: Number of audio channels """ self._room = room self._expected_identity = participant_identity @@ -82,11 +158,17 @@ def __init__( self._closed = False # set up track streamers - self._audio_streamer = self._RemoteTrackStreamer( - rtc.TrackSource.SOURCE_MICROPHONE, enabled=audio_enabled, sample_rate=16000 + self._audio_streamer = self._RemoteTrackHandler( + track_to_stream=partial( + rtc.AudioStream, + sample_rate=audio_sample_rate, + num_channels=audio_num_channels, + ), + enabled=audio_enabled, ) - self._video_streamer = self._RemoteTrackStreamer( - rtc.TrackSource.SOURCE_CAMERA, enabled=video_enabled + self._video_streamer = self._RemoteTrackHandler( + track_to_stream=rtc.VideoStream, + enabled=video_enabled, ) self._participant_ready = asyncio.Event() @@ -136,17 +218,21 @@ def _on_participant_connected(self, participant: rtc.RemoteParticipant) -> None: return self._link_participant(participant) - def _subscribe_to_tracks(self, *args, **kwargs) -> None: + def _subscribe_to_tracks(self, *args) -> None: if self._participant is None: return + if args and isinstance(args[-1], rtc.RemoteParticipant): + # track_published: (publication, participant) + # track_subscribed: (track, publication, participant) + if args[-1].identity != self._participant.identity: + return for publication in self._participant.track_publications.values(): # skip tracks we don't care about - streamer = None - if publication.source == rtc.TrackSource.SOURCE_MICROPHONE: - streamer = self._audio_streamer - elif publication.source == rtc.TrackSource.SOURCE_CAMERA: - streamer = self._video_streamer + streamer = { + rtc.TrackSource.SOURCE_MICROPHONE: self._audio_streamer, + rtc.TrackSource.SOURCE_CAMERA: self._video_streamer, + }.get(publication.source) if streamer is None or not streamer.enabled: continue @@ -171,89 +257,5 @@ async def aclose(self) -> None: # Cancel stream tasks for streamer in [self._audio_streamer, self._video_streamer]: - if streamer.task is not None: - await aio.graceful_cancel(streamer.task) - - -class RoomAudioSink(AudioSink): - """AudioSink implementation that publishes audio to a LiveKit room using a LocalAudioTrack""" - - def __init__( - self, room: rtc.Room, sample_rate: int = 24000, num_channels: int = 1 - ) -> None: - """Initialize the RoomAudioSink - - Args: - room: The LiveKit room to publish audio to - sample_rate: Sample rate of the audio in Hz - num_channels: Number of audio channels - """ - super().__init__(sample_rate=sample_rate) - self._room = room - self._num_channels = num_channels - - # Create audio source and track - self._audio_source = rtc.AudioSource( - sample_rate=sample_rate, num_channels=num_channels - ) - self._track = rtc.LocalAudioTrack.create_audio_track( - "assistant_voice", self._audio_source - ) - - self._publication: rtc.LocalTrackPublication | None = None - self._publish_task: asyncio.Task | None = None - self._pushed_duration: float | None = None - - async def start(self) -> None: - """Start publishing the audio track to the room""" - if self._publication: - return - - self._publication = await self._room.local_participant.publish_track( - self._track, - rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE), - ) - - # is this necessary? - await self._publication.wait_for_subscription() - - async def capture_frame(self, frame: rtc.AudioFrame) -> None: - """Capture an audio frame and publish it to the room - - Args: - frame: The audio frame to publish - """ - await super().capture_frame(frame) - - if self._pushed_duration is None: - self._pushed_duration = 0.0 - - self._pushed_duration += frame.duration - await self._audio_source.capture_frame(frame) - - def flush(self) -> None: - """Flush the current audio segment and notify when complete""" - super().flush() - - if self._pushed_duration is not None: - # Notify that playback finished - self.on_playback_finished( - playback_position=self._pushed_duration, interrupted=False - ) - self._pushed_duration = None - - def clear_buffer(self) -> None: - """Clear the audio buffer immediately""" - # Clear the buffer - self._audio_source.clear_queue() - - if self._pushed_duration is not None: - # Notify that playback was interrupted - self.on_playback_finished( - playback_position=self._pushed_duration, interrupted=True - ) - self._pushed_duration = None - - async def aclose(self) -> None: - """Clean up resources""" - await self._audio_source.aclose() + if streamer.stream_task is not None: + await aio.graceful_cancel(streamer.stream_task) From 9a39e53c92c9ab4891442769ceb27d125b010fc5 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Thu, 23 Jan 2025 17:57:22 +0800 Subject: [PATCH 05/15] fix wait for playout --- .../livekit/agents/pipeline/room_io.py | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/livekit-agents/livekit/agents/pipeline/room_io.py b/livekit-agents/livekit/agents/pipeline/room_io.py index b33c15a3f..3c80fd633 100644 --- a/livekit-agents/livekit/agents/pipeline/room_io.py +++ b/livekit-agents/livekit/agents/pipeline/room_io.py @@ -1,5 +1,3 @@ -from __future__ import annotations - import asyncio from functools import partial from typing import Callable, Optional @@ -43,6 +41,7 @@ async def start(self) -> None: if self._publication: return + # TODO: handle reconnected, do we need to cancel and re-publish? self._publication = await self._room.local_participant.publish_track( self._track, rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE), @@ -66,24 +65,28 @@ def flush(self) -> None: super().flush() if self._pushed_duration is not None: - # Notify that playback finished - self.on_playback_finished( - playback_position=self._pushed_duration, interrupted=False - ) + self._notify_playback_finished(self._pushed_duration, interrupted=False) self._pushed_duration = None def clear_buffer(self) -> None: """Clear the audio buffer immediately""" - # Clear the buffer self._audio_source.clear_queue() if self._pushed_duration is not None: - # Notify that playback was interrupted - self.on_playback_finished( - playback_position=self._pushed_duration, interrupted=True - ) + self._notify_playback_finished(self._pushed_duration, interrupted=True) self._pushed_duration = None + def _notify_playback_finished( + self, playback_position: float, interrupted: bool + ) -> None: + """Wait for the audio to be played out and notify when complete""" + playout_task = asyncio.create_task(self._audio_source.wait_for_playout()) + playout_task.add_done_callback( + lambda _: self.on_playback_finished( + playback_position=playback_position, interrupted=interrupted + ) + ) + class RoomInput: """Creates video and audio streams from a remote participant in a LiveKit room""" @@ -175,6 +178,7 @@ def __init__( self._room.on("participant_connected", self._on_participant_connected) self._room.on("track_published", self._subscribe_to_tracks) self._room.on("track_subscribed", self._subscribe_to_tracks) + self._room.on("reconnected", self._subscribe_to_tracks) # try to find participant if self._expected_identity is not None: From 5c30bee5d6467fedad3ab7a26c14e35bbde7e84b Mon Sep 17 00:00:00 2001 From: Long Chen Date: Thu, 23 Jan 2025 22:07:52 +0800 Subject: [PATCH 06/15] add enable and disable the room input --- examples/roomio_worker.py | 11 +- .../livekit/agents/pipeline/room_io.py | 167 +++++++++++------- 2 files changed, 112 insertions(+), 66 deletions(-) diff --git a/examples/roomio_worker.py b/examples/roomio_worker.py index 2cbdb3d00..a88e0aaf6 100644 --- a/examples/roomio_worker.py +++ b/examples/roomio_worker.py @@ -1,10 +1,11 @@ import logging from dotenv import load_dotenv +from livekit import rtc from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, WorkerType, cli from livekit.agents.pipeline import AgentTask, ChatCLI, PipelineAgent -from livekit.agents.pipeline.room_io import RoomAudioSink, RoomInput from livekit.agents.pipeline.io import PlaybackFinishedEvent +from livekit.agents.pipeline.room_io import RoomAudioSink, RoomInput, RoomInputOptions from livekit.plugins import cartesia, deepgram, openai, silero logger = logging.getLogger("my-worker") @@ -28,7 +29,13 @@ async def entrypoint(ctx: JobContext): # chat_cli = ChatCLI(agent) # await chat_cli.run() - room_input = RoomInput(ctx.room, audio_enabled=True, video_enabled=False) + room_input = RoomInput( + ctx.room, + options=RoomInputOptions( + subscribe_audio=True, + subscribe_video=False, + ), + ) audio_output = RoomAudioSink(ctx.room, sample_rate=24000, num_channels=1) agent.input.audio = room_input.audio diff --git a/livekit-agents/livekit/agents/pipeline/room_io.py b/livekit-agents/livekit/agents/pipeline/room_io.py index 3c80fd633..d7aeac835 100644 --- a/livekit-agents/livekit/agents/pipeline/room_io.py +++ b/livekit-agents/livekit/agents/pipeline/room_io.py @@ -1,6 +1,6 @@ import asyncio -from functools import partial -from typing import Callable, Optional +from dataclasses import dataclass +from typing import Optional from livekit import rtc @@ -41,7 +41,7 @@ async def start(self) -> None: if self._publication: return - # TODO: handle reconnected, do we need to cancel and re-publish? + # TODO: handle reconnected, do we need to cancel and re-publish? seems no self._publication = await self._room.local_participant.publish_track( self._track, rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE), @@ -88,91 +88,115 @@ def _notify_playback_finished( ) +@dataclass +class RoomInputOptions: + subscribe_audio: bool = True + """Whether to subscribe to audio""" + subscribe_video: bool = False + """Whether to subscribe to video""" + audio_sample_rate: int = 16000 + """Sample rate of the input audio in Hz""" + audio_num_channels: int = 1 + """Number of audio channels""" + + +DEFAULT_ROOM_INPUT_OPTIONS = RoomInputOptions() + + class RoomInput: """Creates video and audio streams from a remote participant in a LiveKit room""" class _RemoteTrackHandler: """Manages streaming from a remote track to a aio.Chan""" - def __init__( - self, - track_to_stream: Callable[ - [rtc.RemoteTrack], rtc.AudioStream | rtc.VideoStream - ], - enabled: bool = False, - ) -> None: - self.track_to_stream = track_to_stream - self.enabled = enabled + def __init__(self, options: RoomInputOptions) -> None: + self._options = options - self.track: rtc.RemoteTrack | None = None - self.stream_task: asyncio.Task | None = None - self._data_ch: aio.Chan[rtc.AudioFrame | rtc.VideoFrame] | None = None + self._remote_track: rtc.RemoteTrack | None = None + self._main_atask: asyncio.Task | None = None - if enabled: - self._data_ch = aio.Chan() + self._data_ch: aio.Chan[rtc.AudioFrame | rtc.VideoFrame] = aio.Chan() + self._enabled = True # stream the frame or not @property - def data_ch(self) -> aio.Chan | None: + def data_ch(self) -> aio.Chan[rtc.AudioFrame | rtc.VideoFrame]: return self._data_ch + @property + def enabled(self) -> bool: + return self._enabled + + @enabled.setter + def enabled(self, enabled: bool) -> None: + """Drop frames if the stream is not enabled""" + self._enabled = enabled + def setup(self, track: rtc.RemoteTrack) -> None: """Set up streaming for a new track""" - if track == self.track: + if track == self._remote_track: return - if self.stream_task is not None: - self.stream_task.cancel() + if self._main_atask is not None: + self._main_atask.cancel() + + self._remote_track = track + if isinstance(track, rtc.RemoteAudioTrack): + stream = rtc.AudioStream( + track=track, + sample_rate=self._options.audio_sample_rate, + num_channels=self._options.audio_num_channels, + ) + elif isinstance(track, rtc.RemoteVideoTrack): + stream = rtc.VideoStream(track=track) + else: + raise ValueError(f"Unsupported track source type: {type(track)}") - assert self._data_ch is not None - self.track = track - stream = self.track_to_stream(track) - self.stream_task = asyncio.create_task(self._stream_frames(stream)) + self._main_atask = asyncio.create_task(self._read_stream(stream)) - async def _stream_frames( - self, stream: rtc.AudioStream | rtc.VideoStream - ) -> None: - assert self._data_ch is not None + async def _read_stream(self, stream: rtc.AudioStream | rtc.VideoStream) -> None: async for event in stream: + if not self._enabled: + continue self._data_ch.send_nowait(event.frame) + async def aclose(self) -> None: + if self._main_atask is not None: + await aio.graceful_cancel(self._main_atask) + self._main_atask = None + self._remote_track = None + self._data_ch.close() + def __init__( self, room: rtc.Room, participant_identity: Optional[str] = None, - *, - audio_enabled: bool = True, - video_enabled: bool = False, - audio_sample_rate: int = 16000, - audio_num_channels: int = 1, + options: RoomInputOptions = DEFAULT_ROOM_INPUT_OPTIONS, ) -> None: """ Args: room: The LiveKit room to get streams from participant_identity: Optional identity of the participant to get streams from. If None, will use the first participant that joins. - audio_enabled: Whether to enable audio input - video_enabled: Whether to enable video input - audio_sample_rate: Sample rate of the audio in Hz - audio_num_channels: Number of audio channels + options: RoomInputOptions """ + self._options = options self._room = room self._expected_identity = participant_identity self._participant: rtc.RemoteParticipant | None = None self._closed = False - # set up track streamers - self._audio_streamer = self._RemoteTrackHandler( - track_to_stream=partial( - rtc.AudioStream, - sample_rate=audio_sample_rate, - num_channels=audio_num_channels, - ), - enabled=audio_enabled, - ) - self._video_streamer = self._RemoteTrackHandler( - track_to_stream=rtc.VideoStream, - enabled=video_enabled, - ) + # set up track handlers + self._track_handlers: dict[ + rtc.TrackSource.ValueType, RoomInput._RemoteTrackHandler + ] = {} + if self._options.subscribe_audio: + self._track_handlers[rtc.TrackSource.SOURCE_MICROPHONE] = ( + self._RemoteTrackHandler(self._options) + ) + if self._options.subscribe_video: + self._track_handlers[rtc.TrackSource.SOURCE_CAMERA] = ( + self._RemoteTrackHandler(self._options) + ) self._participant_ready = asyncio.Event() self._room.on("participant_connected", self._on_participant_connected) @@ -198,11 +222,32 @@ async def wait_for_participant(self) -> rtc.RemoteParticipant: @property def audio(self) -> AudioStream | None: - return self._audio_streamer.data_ch + if rtc.TrackSource.SOURCE_MICROPHONE not in self._track_handlers: + return None + return self._track_handlers[rtc.TrackSource.SOURCE_MICROPHONE].data_ch @property def video(self) -> VideoStream | None: - return self._video_streamer.data_ch + if rtc.TrackSource.SOURCE_CAMERA not in self._track_handlers: + return None + return self._track_handlers[rtc.TrackSource.SOURCE_CAMERA].data_ch + + @property + def audio_enabled(self) -> bool: + if rtc.TrackSource.SOURCE_MICROPHONE not in self._track_handlers: + return False + return self._track_handlers[rtc.TrackSource.SOURCE_MICROPHONE].enabled + + @property + def video_enabled(self) -> bool: + if rtc.TrackSource.SOURCE_CAMERA not in self._track_handlers: + return False + return self._track_handlers[rtc.TrackSource.SOURCE_CAMERA].enabled + + def set_enabled(self, source: rtc.TrackSource.ValueType, enabled: bool) -> None: + if source not in self._track_handlers: + raise ValueError(f"Track {source} is not subscribed") + self._track_handlers[source].enabled = enabled def _link_participant(self, participant: rtc.RemoteParticipant) -> None: if ( @@ -233,12 +278,8 @@ def _subscribe_to_tracks(self, *args) -> None: for publication in self._participant.track_publications.values(): # skip tracks we don't care about - streamer = { - rtc.TrackSource.SOURCE_MICROPHONE: self._audio_streamer, - rtc.TrackSource.SOURCE_CAMERA: self._video_streamer, - }.get(publication.source) - - if streamer is None or not streamer.enabled: + handler = self._track_handlers.get(publication.source) + if handler is None: continue # subscribe and setup streaming @@ -247,7 +288,7 @@ def _subscribe_to_tracks(self, *args) -> None: track: rtc.RemoteTrack | None = publication.track if track is not None: - streamer.setup(track) + handler.setup(track) async def aclose(self) -> None: if self._closed: @@ -259,7 +300,5 @@ async def aclose(self) -> None: self._room.off("track_subscribed", self._subscribe_to_tracks) self._participant = None - # Cancel stream tasks - for streamer in [self._audio_streamer, self._video_streamer]: - if streamer.stream_task is not None: - await aio.graceful_cancel(streamer.stream_task) + for handler in self._track_handlers.values(): + await handler.aclose() From 896d65e390a730a94574765b028c540d64d573bd Mon Sep 17 00:00:00 2001 From: Long Chen Date: Thu, 23 Jan 2025 22:42:13 +0800 Subject: [PATCH 07/15] add video buffer size --- .../livekit/agents/pipeline/room_io.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/livekit-agents/livekit/agents/pipeline/room_io.py b/livekit-agents/livekit/agents/pipeline/room_io.py index d7aeac835..fe6f8de80 100644 --- a/livekit-agents/livekit/agents/pipeline/room_io.py +++ b/livekit-agents/livekit/agents/pipeline/room_io.py @@ -6,6 +6,7 @@ from ..utils import aio from .io import AudioSink, AudioStream, VideoStream +from .log import logger class RoomAudioSink(AudioSink): @@ -98,6 +99,10 @@ class RoomInputOptions: """Sample rate of the input audio in Hz""" audio_num_channels: int = 1 """Number of audio channels""" + video_buffer_size: Optional[int] = 30 + """Buffer size of the video in number of frames, None means unlimited""" + warn_dropped_video_frames: bool = True + """Whether to warn when video frames are dropped""" DEFAULT_ROOM_INPUT_OPTIONS = RoomInputOptions() @@ -158,6 +163,22 @@ async def _read_stream(self, stream: rtc.AudioStream | rtc.VideoStream) -> None: if not self._enabled: continue self._data_ch.send_nowait(event.frame) + if ( + isinstance(event.frame, rtc.VideoFrame) + and self._options.video_buffer_size is not None + ): + dropped = 0 + while self._data_ch.qsize() > self._options.video_buffer_size: + await self._data_ch.recv() # drop old frames if buffer is full + dropped += 1 + if dropped > 0 and self._options.warn_dropped_video_frames: + logger.warning( + "dropping video frames since buffer is full", + extra={ + "buffer_size": self._options.video_buffer_size, + "dropped": dropped, + }, + ) async def aclose(self) -> None: if self._main_atask is not None: From 7196adb6dae3809f50cc7b47e958dd96fc033b90 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Thu, 23 Jan 2025 23:27:37 +0800 Subject: [PATCH 08/15] remove remote tracker handler --- .../livekit/agents/pipeline/room_io.py | 186 ++++++++---------- 1 file changed, 86 insertions(+), 100 deletions(-) diff --git a/livekit-agents/livekit/agents/pipeline/room_io.py b/livekit-agents/livekit/agents/pipeline/room_io.py index fe6f8de80..bd0944f37 100644 --- a/livekit-agents/livekit/agents/pipeline/room_io.py +++ b/livekit-agents/livekit/agents/pipeline/room_io.py @@ -108,84 +108,18 @@ class RoomInputOptions: DEFAULT_ROOM_INPUT_OPTIONS = RoomInputOptions() -class RoomInput: - """Creates video and audio streams from a remote participant in a LiveKit room""" - - class _RemoteTrackHandler: - """Manages streaming from a remote track to a aio.Chan""" - - def __init__(self, options: RoomInputOptions) -> None: - self._options = options - - self._remote_track: rtc.RemoteTrack | None = None - self._main_atask: asyncio.Task | None = None - - self._data_ch: aio.Chan[rtc.AudioFrame | rtc.VideoFrame] = aio.Chan() - self._enabled = True # stream the frame or not - - @property - def data_ch(self) -> aio.Chan[rtc.AudioFrame | rtc.VideoFrame]: - return self._data_ch - - @property - def enabled(self) -> bool: - return self._enabled - - @enabled.setter - def enabled(self, enabled: bool) -> None: - """Drop frames if the stream is not enabled""" - self._enabled = enabled - - def setup(self, track: rtc.RemoteTrack) -> None: - """Set up streaming for a new track""" - if track == self._remote_track: - return - - if self._main_atask is not None: - self._main_atask.cancel() +@dataclass +class _TrackState: + """State for a single audio or video track""" - self._remote_track = track - if isinstance(track, rtc.RemoteAudioTrack): - stream = rtc.AudioStream( - track=track, - sample_rate=self._options.audio_sample_rate, - num_channels=self._options.audio_num_channels, - ) - elif isinstance(track, rtc.RemoteVideoTrack): - stream = rtc.VideoStream(track=track) - else: - raise ValueError(f"Unsupported track source type: {type(track)}") + stream: aio.Chan[rtc.AudioFrame | rtc.VideoFrame] + enabled: bool = True + task: Optional[asyncio.Task] = None + track: Optional[rtc.RemoteTrack] = None - self._main_atask = asyncio.create_task(self._read_stream(stream)) - async def _read_stream(self, stream: rtc.AudioStream | rtc.VideoStream) -> None: - async for event in stream: - if not self._enabled: - continue - self._data_ch.send_nowait(event.frame) - if ( - isinstance(event.frame, rtc.VideoFrame) - and self._options.video_buffer_size is not None - ): - dropped = 0 - while self._data_ch.qsize() > self._options.video_buffer_size: - await self._data_ch.recv() # drop old frames if buffer is full - dropped += 1 - if dropped > 0 and self._options.warn_dropped_video_frames: - logger.warning( - "dropping video frames since buffer is full", - extra={ - "buffer_size": self._options.video_buffer_size, - "dropped": dropped, - }, - ) - - async def aclose(self) -> None: - if self._main_atask is not None: - await aio.graceful_cancel(self._main_atask) - self._main_atask = None - self._remote_track = None - self._data_ch.close() +class RoomInput: + """Creates video and audio streams from a remote participant in a LiveKit room""" def __init__( self, @@ -206,18 +140,17 @@ def __init__( self._participant: rtc.RemoteParticipant | None = None self._closed = False - # set up track handlers - self._track_handlers: dict[ - rtc.TrackSource.ValueType, RoomInput._RemoteTrackHandler - ] = {} + # Track state + self._track_states: dict[rtc.TrackSource.ValueType, _TrackState] = {} + + # Initialize track state for subscribed sources if self._options.subscribe_audio: - self._track_handlers[rtc.TrackSource.SOURCE_MICROPHONE] = ( - self._RemoteTrackHandler(self._options) - ) + source = rtc.TrackSource.SOURCE_MICROPHONE + self._track_states[source] = _TrackState(stream=aio.Chan()) + if self._options.subscribe_video: - self._track_handlers[rtc.TrackSource.SOURCE_CAMERA] = ( - self._RemoteTrackHandler(self._options) - ) + source = rtc.TrackSource.SOURCE_CAMERA + self._track_states[source] = _TrackState(stream=aio.Chan()) self._participant_ready = asyncio.Event() self._room.on("participant_connected", self._on_participant_connected) @@ -243,32 +176,32 @@ async def wait_for_participant(self) -> rtc.RemoteParticipant: @property def audio(self) -> AudioStream | None: - if rtc.TrackSource.SOURCE_MICROPHONE not in self._track_handlers: + if rtc.TrackSource.SOURCE_MICROPHONE not in self._track_states: return None - return self._track_handlers[rtc.TrackSource.SOURCE_MICROPHONE].data_ch + return self._track_states[rtc.TrackSource.SOURCE_MICROPHONE].stream @property def video(self) -> VideoStream | None: - if rtc.TrackSource.SOURCE_CAMERA not in self._track_handlers: + if rtc.TrackSource.SOURCE_CAMERA not in self._track_states: return None - return self._track_handlers[rtc.TrackSource.SOURCE_CAMERA].data_ch + return self._track_states[rtc.TrackSource.SOURCE_CAMERA].stream @property def audio_enabled(self) -> bool: - if rtc.TrackSource.SOURCE_MICROPHONE not in self._track_handlers: + if rtc.TrackSource.SOURCE_MICROPHONE not in self._track_states: return False - return self._track_handlers[rtc.TrackSource.SOURCE_MICROPHONE].enabled + return self._track_states[rtc.TrackSource.SOURCE_MICROPHONE].enabled @property def video_enabled(self) -> bool: - if rtc.TrackSource.SOURCE_CAMERA not in self._track_handlers: + if rtc.TrackSource.SOURCE_CAMERA not in self._track_states: return False - return self._track_handlers[rtc.TrackSource.SOURCE_CAMERA].enabled + return self._track_states[rtc.TrackSource.SOURCE_CAMERA].enabled def set_enabled(self, source: rtc.TrackSource.ValueType, enabled: bool) -> None: - if source not in self._track_handlers: + if source not in self._track_states: raise ValueError(f"Track {source} is not subscribed") - self._track_handlers[source].enabled = enabled + self._track_states[source].enabled = enabled def _link_participant(self, participant: rtc.RemoteParticipant) -> None: if ( @@ -299,8 +232,7 @@ def _subscribe_to_tracks(self, *args) -> None: for publication in self._participant.track_publications.values(): # skip tracks we don't care about - handler = self._track_handlers.get(publication.source) - if handler is None: + if publication.source not in self._track_states: continue # subscribe and setup streaming @@ -309,7 +241,57 @@ def _subscribe_to_tracks(self, *args) -> None: track: rtc.RemoteTrack | None = publication.track if track is not None: - handler.setup(track) + self._setup_track(publication.source, track) + + def _setup_track( + self, source: rtc.TrackSource.ValueType, track: rtc.RemoteTrack + ) -> None: + """Set up streaming for a new track""" + state = self._track_states[source] + if track == state.track: + return + + # Cancel existing task if any + if state.task is not None: + state.task.cancel() + + state.track = track + if isinstance(track, rtc.RemoteAudioTrack): + stream = rtc.AudioStream( + track=track, + sample_rate=self._options.audio_sample_rate, + num_channels=self._options.audio_num_channels, + ) + elif isinstance(track, rtc.RemoteVideoTrack): + stream = rtc.VideoStream(track=track) + else: + raise ValueError(f"Unsupported track source type: {type(track)}") + + async def _read_stream(stream: rtc.AudioStream | rtc.VideoStream) -> None: + """Handle reading from an audio or video stream""" + + async for event in stream: + if not state.enabled: + continue + state.stream.send_nowait(event.frame) + if ( + isinstance(event.frame, rtc.VideoFrame) + and self._options.video_buffer_size is not None + ): + dropped = 0 + while state.stream.qsize() > self._options.video_buffer_size: + await state.stream.recv() # drop old frames if buffer is full + dropped += 1 + if dropped > 0 and self._options.warn_dropped_video_frames: + logger.warning( + "dropping video frames since buffer is full", + extra={ + "buffer_size": self._options.video_buffer_size, + "dropped": dropped, + }, + ) + + state.task = asyncio.create_task(_read_stream(stream)) async def aclose(self) -> None: if self._closed: @@ -321,5 +303,9 @@ async def aclose(self) -> None: self._room.off("track_subscribed", self._subscribe_to_tracks) self._participant = None - for handler in self._track_handlers.values(): - await handler.aclose() + # Cancel all track tasks and close channels + for state in self._track_states.values(): + if state.task is not None: + await aio.graceful_cancel(state.task) + state.stream.close() + self._track_states.clear() From 5df24dc4a25992070a0a65f9bdebfce088a453ee Mon Sep 17 00:00:00 2001 From: Long Chen Date: Fri, 24 Jan 2025 00:06:22 +0800 Subject: [PATCH 09/15] use stream.from_participant --- .../livekit/agents/pipeline/room_io.py | 176 ++++-------------- 1 file changed, 40 insertions(+), 136 deletions(-) diff --git a/livekit-agents/livekit/agents/pipeline/room_io.py b/livekit-agents/livekit/agents/pipeline/room_io.py index bd0944f37..25227adcd 100644 --- a/livekit-agents/livekit/agents/pipeline/room_io.py +++ b/livekit-agents/livekit/agents/pipeline/room_io.py @@ -1,12 +1,10 @@ import asyncio from dataclasses import dataclass -from typing import Optional +from typing import AsyncIterator, Optional from livekit import rtc -from ..utils import aio -from .io import AudioSink, AudioStream, VideoStream -from .log import logger +from .io import AudioSink class RoomAudioSink(AudioSink): @@ -99,25 +97,15 @@ class RoomInputOptions: """Sample rate of the input audio in Hz""" audio_num_channels: int = 1 """Number of audio channels""" - video_buffer_size: Optional[int] = 30 - """Buffer size of the video in number of frames, None means unlimited""" - warn_dropped_video_frames: bool = True - """Whether to warn when video frames are dropped""" + audio_queue_capacity: int = 0 + """Capacity of the internal audio queue, 0 means unlimited""" + video_queue_capacity: int = 0 + """Capacity of the internal video queue, 0 means unlimited""" DEFAULT_ROOM_INPUT_OPTIONS = RoomInputOptions() -@dataclass -class _TrackState: - """State for a single audio or video track""" - - stream: aio.Chan[rtc.AudioFrame | rtc.VideoFrame] - enabled: bool = True - task: Optional[asyncio.Task] = None - track: Optional[rtc.RemoteTrack] = None - - class RoomInput: """Creates video and audio streams from a remote participant in a LiveKit room""" @@ -140,23 +128,12 @@ def __init__( self._participant: rtc.RemoteParticipant | None = None self._closed = False - # Track state - self._track_states: dict[rtc.TrackSource.ValueType, _TrackState] = {} - - # Initialize track state for subscribed sources - if self._options.subscribe_audio: - source = rtc.TrackSource.SOURCE_MICROPHONE - self._track_states[source] = _TrackState(stream=aio.Chan()) - - if self._options.subscribe_video: - source = rtc.TrackSource.SOURCE_CAMERA - self._track_states[source] = _TrackState(stream=aio.Chan()) + # streams + self._audio_stream: Optional[rtc.AudioStream] = None + self._video_stream: Optional[rtc.VideoStream] = None self._participant_ready = asyncio.Event() self._room.on("participant_connected", self._on_participant_connected) - self._room.on("track_published", self._subscribe_to_tracks) - self._room.on("track_subscribed", self._subscribe_to_tracks) - self._room.on("reconnected", self._subscribe_to_tracks) # try to find participant if self._expected_identity is not None: @@ -175,33 +152,22 @@ async def wait_for_participant(self) -> rtc.RemoteParticipant: return self._participant @property - def audio(self) -> AudioStream | None: - if rtc.TrackSource.SOURCE_MICROPHONE not in self._track_states: + def audio(self) -> AsyncIterator[rtc.AudioFrame]: + if self._audio_stream is None: return None - return self._track_states[rtc.TrackSource.SOURCE_MICROPHONE].stream + return self._read_stream(self._audio_stream) @property - def video(self) -> VideoStream | None: - if rtc.TrackSource.SOURCE_CAMERA not in self._track_states: + def video(self) -> AsyncIterator[rtc.VideoFrame]: + if self._video_stream is None: return None - return self._track_states[rtc.TrackSource.SOURCE_CAMERA].stream + return self._read_stream(self._video_stream) - @property - def audio_enabled(self) -> bool: - if rtc.TrackSource.SOURCE_MICROPHONE not in self._track_states: - return False - return self._track_states[rtc.TrackSource.SOURCE_MICROPHONE].enabled - - @property - def video_enabled(self) -> bool: - if rtc.TrackSource.SOURCE_CAMERA not in self._track_states: - return False - return self._track_states[rtc.TrackSource.SOURCE_CAMERA].enabled - - def set_enabled(self, source: rtc.TrackSource.ValueType, enabled: bool) -> None: - if source not in self._track_states: - raise ValueError(f"Track {source} is not subscribed") - self._track_states[source].enabled = enabled + async def _read_stream( + self, stream: rtc.AudioStream | rtc.VideoStream + ) -> AsyncIterator[rtc.AudioFrame | rtc.VideoFrame]: + async for event in stream: + yield event.frame def _link_participant(self, participant: rtc.RemoteParticipant) -> None: if ( @@ -211,101 +177,39 @@ def _link_participant(self, participant: rtc.RemoteParticipant) -> None: return self._participant = participant - self._participant_ready.set() # set up tracks - self._subscribe_to_tracks() + if self._options.subscribe_audio: + self._audio_stream = rtc.AudioStream.from_participant( + participant=participant, + track_source=rtc.TrackSource.SOURCE_MICROPHONE, + sample_rate=self._options.audio_sample_rate, + num_channels=self._options.audio_num_channels, + capacity=self._options.audio_queue_capacity, + ) + if self._options.subscribe_video: + self._video_stream = rtc.VideoStream.from_participant( + participant=participant, + track_source=rtc.TrackSource.SOURCE_CAMERA, + capacity=self._options.video_queue_capacity, + ) + + self._participant_ready.set() def _on_participant_connected(self, participant: rtc.RemoteParticipant) -> None: if self._participant is not None: return self._link_participant(participant) - def _subscribe_to_tracks(self, *args) -> None: - if self._participant is None: - return - if args and isinstance(args[-1], rtc.RemoteParticipant): - # track_published: (publication, participant) - # track_subscribed: (track, publication, participant) - if args[-1].identity != self._participant.identity: - return - - for publication in self._participant.track_publications.values(): - # skip tracks we don't care about - if publication.source not in self._track_states: - continue - - # subscribe and setup streaming - if not publication.subscribed: - publication.set_subscribed(True) - - track: rtc.RemoteTrack | None = publication.track - if track is not None: - self._setup_track(publication.source, track) - - def _setup_track( - self, source: rtc.TrackSource.ValueType, track: rtc.RemoteTrack - ) -> None: - """Set up streaming for a new track""" - state = self._track_states[source] - if track == state.track: - return - - # Cancel existing task if any - if state.task is not None: - state.task.cancel() - - state.track = track - if isinstance(track, rtc.RemoteAudioTrack): - stream = rtc.AudioStream( - track=track, - sample_rate=self._options.audio_sample_rate, - num_channels=self._options.audio_num_channels, - ) - elif isinstance(track, rtc.RemoteVideoTrack): - stream = rtc.VideoStream(track=track) - else: - raise ValueError(f"Unsupported track source type: {type(track)}") - - async def _read_stream(stream: rtc.AudioStream | rtc.VideoStream) -> None: - """Handle reading from an audio or video stream""" - - async for event in stream: - if not state.enabled: - continue - state.stream.send_nowait(event.frame) - if ( - isinstance(event.frame, rtc.VideoFrame) - and self._options.video_buffer_size is not None - ): - dropped = 0 - while state.stream.qsize() > self._options.video_buffer_size: - await state.stream.recv() # drop old frames if buffer is full - dropped += 1 - if dropped > 0 and self._options.warn_dropped_video_frames: - logger.warning( - "dropping video frames since buffer is full", - extra={ - "buffer_size": self._options.video_buffer_size, - "dropped": dropped, - }, - ) - - state.task = asyncio.create_task(_read_stream(stream)) - async def aclose(self) -> None: if self._closed: raise RuntimeError("RoomInput already closed") self._closed = True self._room.off("participant_connected", self._on_participant_connected) - self._room.off("track_published", self._subscribe_to_tracks) - self._room.off("track_subscribed", self._subscribe_to_tracks) self._participant = None - # Cancel all track tasks and close channels - for state in self._track_states.values(): - if state.task is not None: - await aio.graceful_cancel(state.task) - state.stream.close() - self._track_states.clear() + if self._audio_stream is not None: + await self._audio_stream.aclose() + if self._video_stream is not None: + await self._video_stream.aclose() From 44eb659d299336c0a7855b923ab5ca9582602abb Mon Sep 17 00:00:00 2001 From: Long Chen Date: Fri, 24 Jan 2025 11:57:14 +0800 Subject: [PATCH 10/15] update room input stream --- .../livekit/agents/pipeline/room_io.py | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/livekit-agents/livekit/agents/pipeline/room_io.py b/livekit-agents/livekit/agents/pipeline/room_io.py index 25227adcd..dd3ea0dd3 100644 --- a/livekit-agents/livekit/agents/pipeline/room_io.py +++ b/livekit-agents/livekit/agents/pipeline/room_io.py @@ -7,6 +7,7 @@ from .io import AudioSink +# TODO: add RoomOutput that has audio and video sinks, optionally with av sync? class RoomAudioSink(AudioSink): """AudioSink implementation that publishes audio to a LiveKit room""" @@ -89,9 +90,9 @@ def _notify_playback_finished( @dataclass class RoomInputOptions: - subscribe_audio: bool = True + audio_enabled: bool = True """Whether to subscribe to audio""" - subscribe_video: bool = False + video_enabled: bool = False """Whether to subscribe to video""" audio_sample_rate: int = 16000 """Sample rate of the input audio in Hz""" @@ -152,22 +153,26 @@ async def wait_for_participant(self) -> rtc.RemoteParticipant: return self._participant @property - def audio(self) -> AsyncIterator[rtc.AudioFrame]: + def audio(self) -> AsyncIterator[rtc.AudioFrame] | None: if self._audio_stream is None: return None - return self._read_stream(self._audio_stream) + + async def _read_stream(): + async for event in self._audio_stream: + yield event.frame + + return _read_stream() @property - def video(self) -> AsyncIterator[rtc.VideoFrame]: + def video(self) -> AsyncIterator[rtc.VideoFrame] | None: if self._video_stream is None: return None - return self._read_stream(self._video_stream) - async def _read_stream( - self, stream: rtc.AudioStream | rtc.VideoStream - ) -> AsyncIterator[rtc.AudioFrame | rtc.VideoFrame]: - async for event in stream: - yield event.frame + async def _read_stream(): + async for event in self._video_stream: + yield event.frame + + return _read_stream() def _link_participant(self, participant: rtc.RemoteParticipant) -> None: if ( @@ -179,7 +184,7 @@ def _link_participant(self, participant: rtc.RemoteParticipant) -> None: self._participant = participant # set up tracks - if self._options.subscribe_audio: + if self._options.audio_enabled: self._audio_stream = rtc.AudioStream.from_participant( participant=participant, track_source=rtc.TrackSource.SOURCE_MICROPHONE, @@ -187,7 +192,7 @@ def _link_participant(self, participant: rtc.RemoteParticipant) -> None: num_channels=self._options.audio_num_channels, capacity=self._options.audio_queue_capacity, ) - if self._options.subscribe_video: + if self._options.video_enabled: self._video_stream = rtc.VideoStream.from_participant( participant=participant, track_source=rtc.TrackSource.SOURCE_CAMERA, @@ -211,5 +216,7 @@ async def aclose(self) -> None: if self._audio_stream is not None: await self._audio_stream.aclose() + self._audio_stream = None if self._video_stream is not None: await self._video_stream.aclose() + self._video_stream = None From f9238205fdb738739cf56182b056a71fca646f0d Mon Sep 17 00:00:00 2001 From: Long Chen Date: Fri, 24 Jan 2025 22:10:41 +0800 Subject: [PATCH 11/15] add room output --- examples/roomio_worker.py | 14 +- .../livekit/agents/pipeline/room_io.py | 189 ++++++++++-------- 2 files changed, 115 insertions(+), 88 deletions(-) diff --git a/examples/roomio_worker.py b/examples/roomio_worker.py index a88e0aaf6..ab57555f5 100644 --- a/examples/roomio_worker.py +++ b/examples/roomio_worker.py @@ -5,7 +5,7 @@ from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, WorkerType, cli from livekit.agents.pipeline import AgentTask, ChatCLI, PipelineAgent from livekit.agents.pipeline.io import PlaybackFinishedEvent -from livekit.agents.pipeline.room_io import RoomAudioSink, RoomInput, RoomInputOptions +from livekit.agents.pipeline.room_io import RoomOutput, RoomInput, RoomInputOptions from livekit.plugins import cartesia, deepgram, openai, silero logger = logging.getLogger("my-worker") @@ -32,17 +32,17 @@ async def entrypoint(ctx: JobContext): room_input = RoomInput( ctx.room, options=RoomInputOptions( - subscribe_audio=True, - subscribe_video=False, + audio_enabled=True, + video_enabled=False, ), ) - audio_output = RoomAudioSink(ctx.room, sample_rate=24000, num_channels=1) + room_output = RoomOutput(ctx.room, sample_rate=24000, num_channels=1) agent.input.audio = room_input.audio - agent.output.audio = audio_output + agent.output.audio = room_output.audio # TODO: the interrupted flag is not set correctly - @audio_output.on("playback_finished") + @room_output.audio.on("playback_finished") def on_playback_finished(ev: PlaybackFinishedEvent) -> None: logger.info( "playback_finished", @@ -53,7 +53,7 @@ def on_playback_finished(ev: PlaybackFinishedEvent) -> None: ) await room_input.wait_for_participant() - await audio_output.start() + await room_output.start() if __name__ == "__main__": diff --git a/livekit-agents/livekit/agents/pipeline/room_io.py b/livekit-agents/livekit/agents/pipeline/room_io.py index dd3ea0dd3..5064cd66b 100644 --- a/livekit-agents/livekit/agents/pipeline/room_io.py +++ b/livekit-agents/livekit/agents/pipeline/room_io.py @@ -7,87 +7,6 @@ from .io import AudioSink -# TODO: add RoomOutput that has audio and video sinks, optionally with av sync? -class RoomAudioSink(AudioSink): - """AudioSink implementation that publishes audio to a LiveKit room""" - - def __init__( - self, room: rtc.Room, *, sample_rate: int = 24000, num_channels: int = 1 - ) -> None: - """Initialize the RoomAudioSink - - Args: - room: The LiveKit room to publish audio to - sample_rate: Sample rate of the audio in Hz - num_channels: Number of audio channels - """ - super().__init__(sample_rate=sample_rate) - self._room = room - - # Create audio source and track - self._audio_source = rtc.AudioSource( - sample_rate=sample_rate, num_channels=num_channels - ) - self._track = rtc.LocalAudioTrack.create_audio_track( - "assistant_voice", self._audio_source - ) - - self._publication: rtc.LocalTrackPublication | None = None - self._publish_task: asyncio.Task | None = None - self._pushed_duration: float | None = None - - async def start(self) -> None: - """Start publishing the audio track to the room""" - if self._publication: - return - - # TODO: handle reconnected, do we need to cancel and re-publish? seems no - self._publication = await self._room.local_participant.publish_track( - self._track, - rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE), - ) - - # is this necessary? - await self._publication.wait_for_subscription() - - async def capture_frame(self, frame: rtc.AudioFrame) -> None: - """Capture an audio frame and publish it to the room""" - await super().capture_frame(frame) - - if self._pushed_duration is None: - self._pushed_duration = 0.0 - - self._pushed_duration += frame.duration - await self._audio_source.capture_frame(frame) - - def flush(self) -> None: - """Flush the current audio segment and notify when complete""" - super().flush() - - if self._pushed_duration is not None: - self._notify_playback_finished(self._pushed_duration, interrupted=False) - self._pushed_duration = None - - def clear_buffer(self) -> None: - """Clear the audio buffer immediately""" - self._audio_source.clear_queue() - - if self._pushed_duration is not None: - self._notify_playback_finished(self._pushed_duration, interrupted=True) - self._pushed_duration = None - - def _notify_playback_finished( - self, playback_position: float, interrupted: bool - ) -> None: - """Wait for the audio to be played out and notify when complete""" - playout_task = asyncio.create_task(self._audio_source.wait_for_playout()) - playout_task.add_done_callback( - lambda _: self.on_playback_finished( - playback_position=playback_position, interrupted=interrupted - ) - ) - - @dataclass class RoomInputOptions: audio_enabled: bool = True @@ -220,3 +139,111 @@ async def aclose(self) -> None: if self._video_stream is not None: await self._video_stream.aclose() self._video_stream = None + + +class RoomOutput: + """Manages audio output to a LiveKit room""" + + def __init__( + self, room: rtc.Room, *, sample_rate: int = 24000, num_channels: int = 1 + ) -> None: + """Initialize the RoomOutput + + Args: + room: The LiveKit room to publish media to + sample_rate: Sample rate of the audio in Hz + num_channels: Number of audio channels + """ + self._audio_sink = RoomAudioSink( + room=room, sample_rate=sample_rate, num_channels=num_channels + ) + + async def start(self) -> None: + await self._audio_sink.start() + + @property + def audio(self) -> "RoomAudioSink": + return self._audio_sink + + +class RoomAudioSink(AudioSink): + """AudioSink implementation that publishes audio to a LiveKit room""" + + def __init__( + self, room: rtc.Room, *, sample_rate: int = 24000, num_channels: int = 1 + ) -> None: + """Initialize the RoomAudioSink + + Args: + room: The LiveKit room to publish audio to + sample_rate: Sample rate of the audio in Hz + num_channels: Number of audio channels + """ + super().__init__(sample_rate=sample_rate) + self._room = room + + # create audio source + self._audio_source = rtc.AudioSource( + sample_rate=sample_rate, num_channels=num_channels + ) + + self._publication: rtc.LocalTrackPublication | None = None + self._pushed_duration: float | None = None + + def _on_reconnected(self) -> None: + self._publication = None + asyncio.create_task(self.start()) + + self._room.on("reconnected", _on_reconnected) + + async def start(self) -> None: + """Start publishing the audio track to the room""" + if self._publication: + return + + track = rtc.LocalAudioTrack.create_audio_track( + "assistant_voice", self._audio_source + ) + self._publication = await self._room.local_participant.publish_track( + track=track, + options=rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE), + ) + + await self._publication.wait_for_subscription() + + async def capture_frame(self, frame: rtc.AudioFrame) -> None: + """Capture an audio frame and publish it to the room""" + await super().capture_frame(frame) + + if self._pushed_duration is None: + self._pushed_duration = 0.0 + + self._pushed_duration += frame.duration + await self._audio_source.capture_frame(frame) + + def flush(self) -> None: + """Flush the current audio segment and notify when complete""" + super().flush() + + if self._pushed_duration is not None: + self._notify_playback_finished(self._pushed_duration, interrupted=False) + self._pushed_duration = None + + def clear_buffer(self) -> None: + """Clear the audio buffer immediately""" + self._audio_source.clear_queue() + + if self._pushed_duration is not None: + self._notify_playback_finished(self._pushed_duration, interrupted=True) + self._pushed_duration = None + + def _notify_playback_finished( + self, playback_position: float, interrupted: bool + ) -> None: + """Wait for the audio to be played out and notify when complete""" + playout_task = asyncio.create_task(self._audio_source.wait_for_playout()) + playout_task.add_done_callback( + lambda _: self.on_playback_finished( + playback_position=playback_position, interrupted=interrupted + ) + ) From bdb5c652957295a84d12851a1402ad95345c1e85 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Sun, 26 Jan 2025 12:27:48 +0800 Subject: [PATCH 12/15] make RoomIO default for agent --- examples/roomio_worker.py | 39 +++++++++---------- .../livekit/agents/pipeline/pipeline_agent.py | 22 ++++++++++- 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/examples/roomio_worker.py b/examples/roomio_worker.py index ab57555f5..c1f76be51 100644 --- a/examples/roomio_worker.py +++ b/examples/roomio_worker.py @@ -23,26 +23,28 @@ async def entrypoint(ctx: JobContext): llm=openai.realtime.RealtimeModel(), ) ) - agent.start() - - # # start a chat inside the CLI - # chat_cli = ChatCLI(agent) - # await chat_cli.run() - - room_input = RoomInput( - ctx.room, - options=RoomInputOptions( - audio_enabled=True, - video_enabled=False, - ), - ) - room_output = RoomOutput(ctx.room, sample_rate=24000, num_channels=1) - agent.input.audio = room_input.audio - agent.output.audio = room_output.audio + # default use RoomIO if room is provided + await agent.start(room=ctx.room) + + # # Or use RoomInput and RoomOutput explicitly + # room_input = RoomInput( + # ctx.room, + # options=RoomInputOptions( + # audio_enabled=True, + # video_enabled=False, + # ), + # ) + # room_output = RoomOutput(ctx.room, sample_rate=24000, num_channels=1) + + # agent.input.audio = room_input.audio + # agent.output.audio = room_output.audio + + # await room_input.wait_for_participant() + # await room_output.start() # TODO: the interrupted flag is not set correctly - @room_output.audio.on("playback_finished") + @agent.output.audio.on("playback_finished") def on_playback_finished(ev: PlaybackFinishedEvent) -> None: logger.info( "playback_finished", @@ -52,9 +54,6 @@ def on_playback_finished(ev: PlaybackFinishedEvent) -> None: }, ) - await room_input.wait_for_participant() - await room_output.start() - if __name__ == "__main__": # WorkerType.ROOM is the default worker type which will create an agent for every room. diff --git a/livekit-agents/livekit/agents/pipeline/pipeline_agent.py b/livekit-agents/livekit/agents/pipeline/pipeline_agent.py index 53172c0c8..461d4f033 100644 --- a/livekit-agents/livekit/agents/pipeline/pipeline_agent.py +++ b/livekit-agents/livekit/agents/pipeline/pipeline_agent.py @@ -7,6 +7,7 @@ AsyncIterable, Literal, Tuple, + Optional, ) from livekit import rtc @@ -16,6 +17,7 @@ from . import io from .agent_task import ActiveTask, AgentTask from .speech_handle import SpeechHandle +from .room_io import RoomInput, RoomOutput EventTypes = Literal[ "user_started_speaking", @@ -87,10 +89,28 @@ def __init__( # They can all be overriden by subclasses, by default they use the STT/LLM/TTS specified in the # constructor of the PipelineAgent - def start(self) -> None: + async def start(self, room: Optional[rtc.Room] = None) -> None: + """Start the pipeline agent. + + Args: + room (Optional[rtc.Room]): The LiveKit room. If provided and no input/output audio + is set, automatically configures room audio I/O. + """ if self._started: return + if room is not None: + # configure room I/O if not already set + if self.input.audio is None: + room_input = RoomInput(room=room) + self._input.audio = room_input.audio + await room_input.wait_for_participant() + + if self.output.audio is None: + room_output = RoomOutput(room=room) + self._output.audio = room_output.audio + await room_output.start() + if self.input.audio is not None: self._forward_audio_atask = asyncio.create_task( self._forward_audio_task(), name="_forward_audio_task" From f3b61124dfd50d711911c7b9d2d0db87dd16d099 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Sun, 26 Jan 2025 16:31:00 +0800 Subject: [PATCH 13/15] add buffer to room audio sink --- examples/roomio_worker.py | 20 +++-- .../livekit/agents/pipeline/pipeline_agent.py | 19 +++-- .../livekit/agents/pipeline/room_io.py | 80 ++++++++++++++----- 3 files changed, 84 insertions(+), 35 deletions(-) diff --git a/examples/roomio_worker.py b/examples/roomio_worker.py index c1f76be51..426f25665 100644 --- a/examples/roomio_worker.py +++ b/examples/roomio_worker.py @@ -2,11 +2,11 @@ from dotenv import load_dotenv from livekit import rtc -from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, WorkerType, cli -from livekit.agents.pipeline import AgentTask, ChatCLI, PipelineAgent +from livekit.agents import JobContext, WorkerOptions, WorkerType, cli +from livekit.agents.pipeline import AgentTask, PipelineAgent from livekit.agents.pipeline.io import PlaybackFinishedEvent -from livekit.agents.pipeline.room_io import RoomOutput, RoomInput, RoomInputOptions -from livekit.plugins import cartesia, deepgram, openai, silero +from livekit.agents.pipeline.room_io import RoomInput, RoomInputOptions, RoomOutput +from livekit.plugins import openai logger = logging.getLogger("my-worker") logger.setLevel(logging.INFO) @@ -25,7 +25,15 @@ async def entrypoint(ctx: JobContext): ) # default use RoomIO if room is provided - await agent.start(room=ctx.room) + await agent.start( + room=ctx.room, + room_input_options=RoomInputOptions( + audio_enabled=True, + video_enabled=False, + audio_sample_rate=24000, + audio_num_channels=1, + ), + ) # # Or use RoomInput and RoomOutput explicitly # room_input = RoomInput( @@ -33,6 +41,8 @@ async def entrypoint(ctx: JobContext): # options=RoomInputOptions( # audio_enabled=True, # video_enabled=False, + # audio_sample_rate=24000, + # audio_num_channels=1, # ), # ) # room_output = RoomOutput(ctx.room, sample_rate=24000, num_channels=1) diff --git a/livekit-agents/livekit/agents/pipeline/pipeline_agent.py b/livekit-agents/livekit/agents/pipeline/pipeline_agent.py index 461d4f033..cb8c990b4 100644 --- a/livekit-agents/livekit/agents/pipeline/pipeline_agent.py +++ b/livekit-agents/livekit/agents/pipeline/pipeline_agent.py @@ -1,14 +1,8 @@ from __future__ import annotations, print_function import asyncio -import heapq from dataclasses import dataclass -from typing import ( - AsyncIterable, - Literal, - Tuple, - Optional, -) +from typing import AsyncIterable, Literal, Optional from livekit import rtc @@ -16,8 +10,8 @@ from ..log import logger from . import io from .agent_task import ActiveTask, AgentTask +from .room_io import RoomInput, RoomInputOptions, RoomOutput from .speech_handle import SpeechHandle -from .room_io import RoomInput, RoomOutput EventTypes = Literal[ "user_started_speaking", @@ -89,12 +83,17 @@ def __init__( # They can all be overriden by subclasses, by default they use the STT/LLM/TTS specified in the # constructor of the PipelineAgent - async def start(self, room: Optional[rtc.Room] = None) -> None: + async def start( + self, + room: Optional[rtc.Room] = None, + room_input_options: Optional[RoomInputOptions] = None, + ) -> None: """Start the pipeline agent. Args: room (Optional[rtc.Room]): The LiveKit room. If provided and no input/output audio is set, automatically configures room audio I/O. + room_input_options (Optional[RoomInputOptions]): Options for the room input. """ if self._started: return @@ -102,7 +101,7 @@ async def start(self, room: Optional[rtc.Room] = None) -> None: if room is not None: # configure room I/O if not already set if self.input.audio is None: - room_input = RoomInput(room=room) + room_input = RoomInput(room=room, options=room_input_options) self._input.audio = room_input.audio await room_input.wait_for_participant() diff --git a/livekit-agents/livekit/agents/pipeline/room_io.py b/livekit-agents/livekit/agents/pipeline/room_io.py index 5064cd66b..a7c915440 100644 --- a/livekit-agents/livekit/agents/pipeline/room_io.py +++ b/livekit-agents/livekit/agents/pipeline/room_io.py @@ -1,10 +1,14 @@ import asyncio from dataclasses import dataclass from typing import AsyncIterator, Optional +import threading from livekit import rtc +from ..utils import aio +from .. import utils from .io import AudioSink +from .log import logger @dataclass @@ -170,7 +174,12 @@ class RoomAudioSink(AudioSink): """AudioSink implementation that publishes audio to a LiveKit room""" def __init__( - self, room: rtc.Room, *, sample_rate: int = 24000, num_channels: int = 1 + self, + room: rtc.Room, + *, + sample_rate: int = 24000, + num_channels: int = 1, + capacity: int = 0, ) -> None: """Initialize the RoomAudioSink @@ -178,17 +187,21 @@ def __init__( room: The LiveKit room to publish audio to sample_rate: Sample rate of the audio in Hz num_channels: Number of audio channels + capacity: Capacity of the internal audio queue, 0 means unlimited """ super().__init__(sample_rate=sample_rate) self._room = room # create audio source + self._audio_buffer: aio.Chan[rtc.AudioFrame] = aio.Chan(maxsize=capacity) + self._audio_buffer_lock = threading.Lock() self._audio_source = rtc.AudioSource( sample_rate=sample_rate, num_channels=num_channels ) self._publication: rtc.LocalTrackPublication | None = None - self._pushed_duration: float | None = None + self._main_atask: asyncio.Task[None] | None = None + self._pushed_duration: Optional[float] = None def _on_reconnected(self) -> None: self._publication = None @@ -208,42 +221,69 @@ async def start(self) -> None: track=track, options=rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE), ) + self._main_atask = asyncio.create_task(self._capture_frame_task()) await self._publication.wait_for_subscription() + @utils.log_exceptions(logger=logger) + async def _capture_frame_task(self) -> None: + async for frame in self._audio_buffer: + if frame is None: + if self._pushed_duration is not None: + # end of segment + await self._notify_playback_finished( + self._pushed_duration, interrupted=False + ) + self._pushed_duration = None + continue + + # capture frame + await self._audio_source.capture_frame(frame) + if self._pushed_duration is None: + self._pushed_duration = 0.0 + self._pushed_duration += frame.duration + async def capture_frame(self, frame: rtc.AudioFrame) -> None: """Capture an audio frame and publish it to the room""" await super().capture_frame(frame) - - if self._pushed_duration is None: - self._pushed_duration = 0.0 - - self._pushed_duration += frame.duration - await self._audio_source.capture_frame(frame) + with self._audio_buffer_lock: + await self._audio_buffer.send(frame) def flush(self) -> None: """Flush the current audio segment and notify when complete""" super().flush() - - if self._pushed_duration is not None: - self._notify_playback_finished(self._pushed_duration, interrupted=False) - self._pushed_duration = None + if self._pushed_duration is None: + return + with self._audio_buffer_lock: + self._audio_buffer.send_nowait(None) def clear_buffer(self) -> None: """Clear the audio buffer immediately""" - self._audio_source.clear_queue() + super().clear_buffer() + with self._audio_buffer_lock: + while not self._audio_buffer.empty(): + try: + self._audio_buffer.recv_nowait() + except aio.ChanEmpty: + break + if self._pushed_duration is not None: + self._pushed_duration = max( + 0, self._pushed_duration - self._audio_source.queued_duration + ) + self._audio_source.clear_queue() if self._pushed_duration is not None: - self._notify_playback_finished(self._pushed_duration, interrupted=True) + asyncio.create_task( + self._notify_playback_finished(self._pushed_duration, interrupted=True) + ) self._pushed_duration = None - def _notify_playback_finished( + async def _notify_playback_finished( self, playback_position: float, interrupted: bool ) -> None: """Wait for the audio to be played out and notify when complete""" - playout_task = asyncio.create_task(self._audio_source.wait_for_playout()) - playout_task.add_done_callback( - lambda _: self.on_playback_finished( - playback_position=playback_position, interrupted=interrupted - ) + await self._audio_source.wait_for_playout() + + self.on_playback_finished( + playback_position=playback_position, interrupted=interrupted ) From 7e8c4e14561616d9436abf313f1d2c78d86bdb9d Mon Sep 17 00:00:00 2001 From: Long Chen Date: Sun, 26 Jan 2025 18:29:07 +0800 Subject: [PATCH 14/15] update types --- livekit-agents/livekit/agents/pipeline/room_io.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/livekit-agents/livekit/agents/pipeline/room_io.py b/livekit-agents/livekit/agents/pipeline/room_io.py index a7c915440..cfd623be5 100644 --- a/livekit-agents/livekit/agents/pipeline/room_io.py +++ b/livekit-agents/livekit/agents/pipeline/room_io.py @@ -192,8 +192,8 @@ def __init__( super().__init__(sample_rate=sample_rate) self._room = room - # create audio source - self._audio_buffer: aio.Chan[rtc.AudioFrame] = aio.Chan(maxsize=capacity) + # buffer the audio frames as soon as they are captured + self._audio_buffer: aio.Chan[rtc.AudioFrame | None] = aio.Chan(maxsize=capacity) self._audio_buffer_lock = threading.Lock() self._audio_source = rtc.AudioSource( sample_rate=sample_rate, num_channels=num_channels From 6de5280ce6d5c4c97043692a844abe602fa36643 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Mon, 27 Jan 2025 23:29:56 +0800 Subject: [PATCH 15/15] use queue of audio source for room audio sink --- .../livekit/agents/pipeline/room_io.py | 94 +++++++------------ 1 file changed, 35 insertions(+), 59 deletions(-) diff --git a/livekit-agents/livekit/agents/pipeline/room_io.py b/livekit-agents/livekit/agents/pipeline/room_io.py index cfd623be5..3be659833 100644 --- a/livekit-agents/livekit/agents/pipeline/room_io.py +++ b/livekit-agents/livekit/agents/pipeline/room_io.py @@ -1,12 +1,9 @@ import asyncio from dataclasses import dataclass from typing import AsyncIterator, Optional -import threading from livekit import rtc -from ..utils import aio -from .. import utils from .io import AudioSink from .log import logger @@ -179,7 +176,7 @@ def __init__( *, sample_rate: int = 24000, num_channels: int = 1, - capacity: int = 0, + queue_size_ms: int = 100_000, ) -> None: """Initialize the RoomAudioSink @@ -187,21 +184,23 @@ def __init__( room: The LiveKit room to publish audio to sample_rate: Sample rate of the audio in Hz num_channels: Number of audio channels - capacity: Capacity of the internal audio queue, 0 means unlimited + queue_size_ms: Size of the internal audio queue in ms. + Default to 100s to capture as fast as possible. """ super().__init__(sample_rate=sample_rate) self._room = room # buffer the audio frames as soon as they are captured - self._audio_buffer: aio.Chan[rtc.AudioFrame | None] = aio.Chan(maxsize=capacity) - self._audio_buffer_lock = threading.Lock() self._audio_source = rtc.AudioSource( - sample_rate=sample_rate, num_channels=num_channels + sample_rate=sample_rate, + num_channels=num_channels, + queue_size_ms=queue_size_ms, ) self._publication: rtc.LocalTrackPublication | None = None - self._main_atask: asyncio.Task[None] | None = None self._pushed_duration: Optional[float] = None + self._interrupted: bool = False + self._flush_task: Optional[asyncio.Task[None]] = None def _on_reconnected(self) -> None: self._publication = None @@ -221,69 +220,46 @@ async def start(self) -> None: track=track, options=rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE), ) - self._main_atask = asyncio.create_task(self._capture_frame_task()) - await self._publication.wait_for_subscription() - @utils.log_exceptions(logger=logger) - async def _capture_frame_task(self) -> None: - async for frame in self._audio_buffer: - if frame is None: - if self._pushed_duration is not None: - # end of segment - await self._notify_playback_finished( - self._pushed_duration, interrupted=False - ) - self._pushed_duration = None - continue - - # capture frame - await self._audio_source.capture_frame(frame) - if self._pushed_duration is None: - self._pushed_duration = 0.0 - self._pushed_duration += frame.duration - async def capture_frame(self, frame: rtc.AudioFrame) -> None: """Capture an audio frame and publish it to the room""" await super().capture_frame(frame) - with self._audio_buffer_lock: - await self._audio_buffer.send(frame) + + if self._pushed_duration is None: + self._pushed_duration = 0.0 + self._interrupted = False # reset interrupted flag + self._pushed_duration += frame.duration + await self._audio_source.capture_frame(frame) def flush(self) -> None: """Flush the current audio segment and notify when complete""" super().flush() if self._pushed_duration is None: return - with self._audio_buffer_lock: - self._audio_buffer.send_nowait(None) + if self._flush_task and not self._flush_task.done(): + # shouldn't happen if only one active speech handle at a time + logger.error("flush called while playback is in progress") + self._flush_task.cancel() + self._flush_task = None + + def _playback_finished(task: asyncio.Task[None]) -> None: + self.on_playback_finished( + playback_position=self._pushed_duration, interrupted=self._interrupted + ) + self._pushed_duration = None + self._interrupted = False + + self._flush_task = asyncio.create_task(self._audio_source.wait_for_playout()) + self._flush_task.add_done_callback(_playback_finished) def clear_buffer(self) -> None: """Clear the audio buffer immediately""" super().clear_buffer() - with self._audio_buffer_lock: - while not self._audio_buffer.empty(): - try: - self._audio_buffer.recv_nowait() - except aio.ChanEmpty: - break - if self._pushed_duration is not None: - self._pushed_duration = max( - 0, self._pushed_duration - self._audio_source.queued_duration - ) - self._audio_source.clear_queue() - - if self._pushed_duration is not None: - asyncio.create_task( - self._notify_playback_finished(self._pushed_duration, interrupted=True) - ) - self._pushed_duration = None - - async def _notify_playback_finished( - self, playback_position: float, interrupted: bool - ) -> None: - """Wait for the audio to be played out and notify when complete""" - await self._audio_source.wait_for_playout() + if self._pushed_duration is None: + return - self.on_playback_finished( - playback_position=playback_position, interrupted=interrupted - ) + queued_duration = self._audio_source.queued_duration + self._pushed_duration = max(0, self._pushed_duration - queued_duration) + self._interrupted = True + self._audio_source.clear_queue()