Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[draft] Room IO for agent v1.0 #1404

Open
wants to merge 14 commits into
base: dev-1.0
Choose a base branch
from
62 changes: 62 additions & 0 deletions examples/roomio_worker.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this lg. though in my opinion this should just be the default mode, but still nice to show how to use custom IO in examples

Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import logging

from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, WorkerType, cli
from livekit.agents.pipeline import AgentTask, ChatCLI, PipelineAgent
from livekit.agents.pipeline.io import PlaybackFinishedEvent
from livekit.agents.pipeline.room_io import RoomAudioSink, RoomInput, RoomInputOptions
from livekit.plugins import cartesia, deepgram, openai, silero

logger = logging.getLogger("my-worker")
logger.setLevel(logging.INFO)

load_dotenv()


async def entrypoint(ctx: JobContext):
await ctx.connect()

agent = PipelineAgent(
task=AgentTask(
instructions="Talk to me!",
llm=openai.realtime.RealtimeModel(),
)
)
agent.start()

# # start a chat inside the CLI
# chat_cli = ChatCLI(agent)
# await chat_cli.run()

room_input = RoomInput(
ctx.room,
options=RoomInputOptions(
subscribe_audio=True,
subscribe_video=False,
),
)
audio_output = RoomAudioSink(ctx.room, sample_rate=24000, num_channels=1)

agent.input.audio = room_input.audio
agent.output.audio = audio_output
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like this to be consistent:

    room_output = RoomOutput(ctx.room, sample_rate=24000, num_channels=1)

    agent.input.audio = room_input.audio
    agent.output.audio = room_output.audio

Copy link
Collaborator Author

@longcw longcw Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is what I am thinking


# TODO: the interrupted flag is not set correctly
@audio_output.on("playback_finished")
def on_playback_finished(ev: PlaybackFinishedEvent) -> None:
logger.info(
"playback_finished",
extra={
"playback_position": ev.playback_position,
"interrupted": ev.interrupted,
},
)

await room_input.wait_for_participant()
await audio_output.start()


if __name__ == "__main__":
# WorkerType.ROOM is the default worker type which will create an agent for every room.
# You can also use WorkerType.PUBLISHER to create a single agent for all participants that publish a track.
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, worker_type=WorkerType.ROOM))
2 changes: 1 addition & 1 deletion livekit-agents/livekit/agents/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def dev(
asyncio_debug=asyncio_debug,
watch=watch,
drain_timeout=0,
register=False,
register=True,
)

_run_dev(args)
Expand Down
8 changes: 7 additions & 1 deletion livekit-agents/livekit/agents/llm/chat_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)

from livekit import rtc
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, ConfigDict
from typing_extensions import TypeAlias

from .. import utils
Expand Down Expand Up @@ -81,12 +81,18 @@ class ImageContent(BaseModel):
Currently only supported by OpenAI (see https://platform.openai.com/docs/guides/vision?lang=node#low-or-high-fidelity-image-understanding)
"""

# temporary fix for pydantic
model_config = ConfigDict(arbitrary_types_allowed=True)


class AudioContent(BaseModel):
type: Literal["audio_content"] = Field(default="audio_content")
frame: list[rtc.AudioFrame]
transcript: Optional[str] = None

# temporary fix for pydantic before rtc.AudioFrame is supported
model_config = ConfigDict(arbitrary_types_allowed=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, will fix asap



class ChatMessage(BaseModel):
id: str = Field(default_factory=lambda: utils.shortuuid("item_"))
Expand Down
222 changes: 222 additions & 0 deletions livekit-agents/livekit/agents/pipeline/room_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
import asyncio
from dataclasses import dataclass
from typing import AsyncIterator, Optional

from livekit import rtc

from .io import AudioSink


# TODO: add RoomOutput that has audio and video sinks, optionally with av sync?
class RoomAudioSink(AudioSink):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should create another class that does both directly at the same time (like ChatCLI).
We can either create another class or merge both. Not sure about the naming but I was thinking about RoomIO

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean to set the input and output of agent when calling RoomIO(agent). But I think this actually make it less flexible if user want to use some different input and output sources, for example for avatar, the input is RoomInput, the output could be DataStreamAudioSink. wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that if you don't want a Room as the Input. The users could simply only change the agent.output.audio and leave agent.input.audio empty (But they create only one class which is RoomIO)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is handled directly inside a run method (like the ChatCLI).

Something like

await room_io.run(input=False)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think RoomInput and RoomOutput could be two classes for the reasons

  1. they both have the audio and video sinks, if in a single RoomIO then there will be four options room_io.run(input_audio=True, input_video=False, output_audio=True, output_video=False)
  2. the input and output can be different as the avatar use case: agent uses RoomInput and datastream output, avatar worker uses data stream input and RoomOutput
  3. We can make the Room input and output as the default for the agent, then if user want to change one of them it's just agent.output.audio = xxx

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yep, that makes sense!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one question: regarding the output, do we want a RoomOutput class including both audio and video sinks, and optionally can enable the av sync? For now I think just a RoomAudioSink is okay as most of the use cases of agent is to publish audios to the room. @davidzhao @theomonnom

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just do a single audio sink for now.. in spirit of unblocking current functionality

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with dz, tho I think it would be cleaner to have only 2 class: RoomInput and RoomOutput.
It may be harder for the users to understand if they have to initialize the RoomAudioSink

Copy link
Collaborator Author

@longcw longcw Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make a RoomOutput with RoomOutput().audio that is an AudioSink.

"""AudioSink implementation that publishes audio to a LiveKit room"""

def __init__(
self, room: rtc.Room, *, sample_rate: int = 24000, num_channels: int = 1
) -> None:
"""Initialize the RoomAudioSink

Args:
room: The LiveKit room to publish audio to
sample_rate: Sample rate of the audio in Hz
num_channels: Number of audio channels
"""
super().__init__(sample_rate=sample_rate)
self._room = room

# Create audio source and track
self._audio_source = rtc.AudioSource(
sample_rate=sample_rate, num_channels=num_channels
)
self._track = rtc.LocalAudioTrack.create_audio_track(
"assistant_voice", self._audio_source
)

self._publication: rtc.LocalTrackPublication | None = None
self._publish_task: asyncio.Task | None = None
self._pushed_duration: float | None = None

async def start(self) -> None:
"""Start publishing the audio track to the room"""
if self._publication:
return

# TODO: handle reconnected, do we need to cancel and re-publish? seems no
self._publication = await self._room.local_participant.publish_track(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reconnection, we indeed need to republish the track

self._track,
rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE),
)

# is this necessary?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is necessary to ensure the agent isn't speaking into the void..

though it's not necessary to block start.. but it needs to happen before the first frame is captured

await self._publication.wait_for_subscription()

async def capture_frame(self, frame: rtc.AudioFrame) -> None:
"""Capture an audio frame and publish it to the room"""
await super().capture_frame(frame)

if self._pushed_duration is None:
self._pushed_duration = 0.0

self._pushed_duration += frame.duration
await self._audio_source.capture_frame(frame)

def flush(self) -> None:
"""Flush the current audio segment and notify when complete"""
super().flush()

if self._pushed_duration is not None:
self._notify_playback_finished(self._pushed_duration, interrupted=False)
self._pushed_duration = None

def clear_buffer(self) -> None:
"""Clear the audio buffer immediately"""
self._audio_source.clear_queue()

if self._pushed_duration is not None:
self._notify_playback_finished(self._pushed_duration, interrupted=True)
self._pushed_duration = None

def _notify_playback_finished(
self, playback_position: float, interrupted: bool
) -> None:
"""Wait for the audio to be played out and notify when complete"""
playout_task = asyncio.create_task(self._audio_source.wait_for_playout())
playout_task.add_done_callback(
lambda _: self.on_playback_finished(
playback_position=playback_position, interrupted=interrupted
)
)


@dataclass
class RoomInputOptions:
audio_enabled: bool = True
"""Whether to subscribe to audio"""
video_enabled: bool = False
"""Whether to subscribe to video"""
audio_sample_rate: int = 16000
Copy link
Member

@theomonnom theomonnom Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we should automatically detect the sample_rate.
I think the right thing to do would be to add a sample_rate field to the AudioSink baseclass.
And the PipelineAgent should resample before using the sink

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one question regarding this, for room input we are using rtc.AudioStream.from_participant, are the audio sample rate and num_channels actually for resampler, that means the output audio frame is resampled to the value in option?

For output AudioSink I agree that the sample rate can be a field in base class for auto resampling.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rtc.AudioStream isn't resampling, when setting a sample_rate + num_channels parameters, it expects to receive frames matching those options when calling capture_frame

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can we know what is the frame rate and num channels read from a remote track? or it's always the same value like 16000 and 1?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my question is whether the sample rate and number of channels provided to rtc.AudioStream.from_participant should match the track's sample rate, or if the audio frames will be automatically resampled to these values before putting to the stream?

@classmethod
    def from_participant(
        cls,
        *,
        participant: Participant,
        track_source: TrackSource.ValueType,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        capacity: int = 0,
        sample_rate: int = 48000,
        num_channels: int = 1,
    ) -> AudioStream:

Copy link
Member

@theomonnom theomonnom Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, I got confused, so the rtc.AudioStream does indeed resample (the default is always 48000 & 1 channel).
But the rtc.AudioSource doesn't (and this is where we should introduce a new sample_rate field on the baseclass)

Copy link
Member

@theomonnom theomonnom Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can directly use the frames coming from the rtc.AudioStream and inject them to the PipelineAgent. We already automatically resample on the STT for this usecase (Tho it isn't the case for the realtime API that currently require 24000, I will create a ticket for it)

"""Sample rate of the input audio in Hz"""
audio_num_channels: int = 1
"""Number of audio channels"""
audio_queue_capacity: int = 0
"""Capacity of the internal audio queue, 0 means unlimited"""
video_queue_capacity: int = 0
"""Capacity of the internal video queue, 0 means unlimited"""


DEFAULT_ROOM_INPUT_OPTIONS = RoomInputOptions()


class RoomInput:
"""Creates video and audio streams from a remote participant in a LiveKit room"""

def __init__(
self,
room: rtc.Room,
participant_identity: Optional[str] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add TrackPublishOption

options: RoomInputOptions = DEFAULT_ROOM_INPUT_OPTIONS,
) -> None:
"""
Args:
room: The LiveKit room to get streams from
participant_identity: Optional identity of the participant to get streams from.
If None, will use the first participant that joins.
options: RoomInputOptions
"""
self._options = options
self._room = room
self._expected_identity = participant_identity
self._participant: rtc.RemoteParticipant | None = None
self._closed = False

# streams
self._audio_stream: Optional[rtc.AudioStream] = None
self._video_stream: Optional[rtc.VideoStream] = None

self._participant_ready = asyncio.Event()
self._room.on("participant_connected", self._on_participant_connected)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's handle room reconnection. If you get a "reconnected" event, you have to republish the local track.
Also we will have to "refind" the track we want to subscribe to


# try to find participant
if self._expected_identity is not None:
participant = self._room.remote_participants.get(self._expected_identity)
if participant is not None:
self._link_participant(participant)
else:
for participant in self._room.remote_participants.values():
self._link_participant(participant)
if self._participant:
break

async def wait_for_participant(self) -> rtc.RemoteParticipant:
await self._participant_ready.wait()
assert self._participant is not None
return self._participant

@property
def audio(self) -> AsyncIterator[rtc.AudioFrame] | None:
if self._audio_stream is None:
return None

async def _read_stream():
async for event in self._audio_stream:
yield event.frame

return _read_stream()

@property
def video(self) -> AsyncIterator[rtc.VideoFrame] | None:
if self._video_stream is None:
return None

async def _read_stream():
async for event in self._video_stream:
yield event.frame

return _read_stream()

def _link_participant(self, participant: rtc.RemoteParticipant) -> None:
if (
self._expected_identity is not None
and participant.identity != self._expected_identity
):
return

self._participant = participant

# set up tracks
if self._options.audio_enabled:
self._audio_stream = rtc.AudioStream.from_participant(
participant=participant,
track_source=rtc.TrackSource.SOURCE_MICROPHONE,
sample_rate=self._options.audio_sample_rate,
num_channels=self._options.audio_num_channels,
capacity=self._options.audio_queue_capacity,
)
if self._options.video_enabled:
self._video_stream = rtc.VideoStream.from_participant(
participant=participant,
track_source=rtc.TrackSource.SOURCE_CAMERA,
capacity=self._options.video_queue_capacity,
)

self._participant_ready.set()

def _on_participant_connected(self, participant: rtc.RemoteParticipant) -> None:
if self._participant is not None:
return
self._link_participant(participant)

async def aclose(self) -> None:
if self._closed:
raise RuntimeError("RoomInput already closed")

self._closed = True
self._room.off("participant_connected", self._on_participant_connected)
self._participant = None

if self._audio_stream is not None:
await self._audio_stream.aclose()
self._audio_stream = None
if self._video_stream is not None:
await self._video_stream.aclose()
self._video_stream = None
13 changes: 12 additions & 1 deletion livekit-agents/livekit/agents/pipeline/speech_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ def _mark_playout_done(self) -> None:
self._playout_done_fut.set_result(None)

async def wait_until_interrupted(self, aw: list[Awaitable]) -> None:
temp_tasks = []
tasks = []
for task in aw:
if not isinstance(task, asyncio.Task):
task = asyncio.create_task(task)
temp_tasks.append(task)
tasks.append(task)

await asyncio.wait(
[*aw, self._interrupt_fut], return_when=asyncio.FIRST_COMPLETED
[*tasks, self._interrupt_fut], return_when=asyncio.FIRST_COMPLETED
)
for task in temp_tasks:
if not task.done():
task.cancel()
Comment on lines +82 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's pretty annoying, will check if I can handle that differently (I'm on python 3.9 so it worked)

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.


from . import beta, realtime
from . import realtime
from .embeddings import EmbeddingData, create_embeddings
from .llm import LLM, LLMStream
from .models import TTSModels, TTSVoices, WhisperModels
Expand All @@ -27,7 +27,7 @@
"LLM",
"LLMStream",
"WhisperModels",
"beta",
# "beta",
"TTSModels",
"TTSVoices",
"create_embeddings",
Expand Down