Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[draft] Room IO for agent v1.0 #1404

Open
wants to merge 12 commits into
base: dev-1.0
Choose a base branch
from
Open

[draft] Room IO for agent v1.0 #1404

wants to merge 12 commits into from

Conversation

longcw
Copy link
Collaborator

@longcw longcw commented Jan 23, 2025

No description provided.

Copy link

changeset-bot bot commented Jan 23, 2025

⚠️ No Changeset found

Latest commit: bdb5c65

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR


class AudioContent(BaseModel):
type: Literal["audio_content"] = Field(default="audio_content")
frame: list[rtc.AudioFrame]
transcript: Optional[str] = None

# temporary fix for pydantic before rtc.AudioFrame is supported
model_config = ConfigDict(arbitrary_types_allowed=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, will fix asap

Comment on lines +82 to +95
temp_tasks = []
tasks = []
for task in aw:
if not isinstance(task, asyncio.Task):
task = asyncio.create_task(task)
temp_tasks.append(task)
tasks.append(task)

await asyncio.wait(
[*aw, self._interrupt_fut], return_when=asyncio.FIRST_COMPLETED
[*tasks, self._interrupt_fut], return_when=asyncio.FIRST_COMPLETED
)
for task in temp_tasks:
if not task.done():
task.cancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's pretty annoying, will check if I can handle that differently (I'm on python 3.9 so it worked)

from .io import AudioSink, AudioStream, VideoStream


class RoomAudioSink(AudioSink):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should create another class that does both directly at the same time (like ChatCLI).
We can either create another class or merge both. Not sure about the naming but I was thinking about RoomIO

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean to set the input and output of agent when calling RoomIO(agent). But I think this actually make it less flexible if user want to use some different input and output sources, for example for avatar, the input is RoomInput, the output could be DataStreamAudioSink. wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that if you don't want a Room as the Input. The users could simply only change the agent.output.audio and leave agent.input.audio empty (But they create only one class which is RoomIO)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is handled directly inside a run method (like the ChatCLI).

Something like

await room_io.run(input=False)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think RoomInput and RoomOutput could be two classes for the reasons

  1. they both have the audio and video sinks, if in a single RoomIO then there will be four options room_io.run(input_audio=True, input_video=False, output_audio=True, output_video=False)
  2. the input and output can be different as the avatar use case: agent uses RoomInput and datastream output, avatar worker uses data stream input and RoomOutput
  3. We can make the Room input and output as the default for the agent, then if user want to change one of them it's just agent.output.audio = xxx

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yep, that makes sense!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one question: regarding the output, do we want a RoomOutput class including both audio and video sinks, and optionally can enable the av sync? For now I think just a RoomAudioSink is okay as most of the use cases of agent is to publish audios to the room. @davidzhao @theomonnom

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just do a single audio sink for now.. in spirit of unblocking current functionality

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with dz, tho I think it would be cleaner to have only 2 class: RoomInput and RoomOutput.
It may be harder for the users to understand if they have to initialize the RoomAudioSink

Copy link
Collaborator Author

@longcw longcw Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make a RoomOutput with RoomOutput().audio that is an AudioSink.

)

self._participant_ready = asyncio.Event()
self._room.on("participant_connected", self._on_participant_connected)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's handle room reconnection. If you get a "reconnected" event, you have to republish the local track.
Also we will have to "refind" the track we want to subscribe to

def __init__(
self,
room: rtc.Room,
participant_identity: Optional[str] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add TrackPublishOption

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this lg. though in my opinion this should just be the default mode, but still nice to show how to use custom IO in examples

rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE),
)

# is this necessary?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is necessary to ensure the agent isn't speaking into the void..

though it's not necessary to block start.. but it needs to happen before the first frame is captured


if self._pushed_duration is not None:
# Notify that playback finished
self.on_playback_finished(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this wait for audio_source.wait_for_playout before notifying?

return self._participant

@property
def audio(self) -> AudioStream | None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat.. lets us swap sources without disrupting downstream

"""Whether to subscribe to audio"""
video_enabled: bool = False
"""Whether to subscribe to video"""
audio_sample_rate: int = 16000
Copy link
Member

@theomonnom theomonnom Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we should automatically detect the sample_rate.
I think the right thing to do would be to add a sample_rate field to the AudioSink baseclass.
And the PipelineAgent should resample before using the sink

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one question regarding this, for room input we are using rtc.AudioStream.from_participant, are the audio sample rate and num_channels actually for resampler, that means the output audio frame is resampled to the value in option?

For output AudioSink I agree that the sample rate can be a field in base class for auto resampling.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rtc.AudioStream isn't resampling, when setting a sample_rate + num_channels parameters, it expects to receive frames matching those options when calling capture_frame

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can we know what is the frame rate and num channels read from a remote track? or it's always the same value like 16000 and 1?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my question is whether the sample rate and number of channels provided to rtc.AudioStream.from_participant should match the track's sample rate, or if the audio frames will be automatically resampled to these values before putting to the stream?

@classmethod
    def from_participant(
        cls,
        *,
        participant: Participant,
        track_source: TrackSource.ValueType,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        capacity: int = 0,
        sample_rate: int = 48000,
        num_channels: int = 1,
    ) -> AudioStream:

Copy link
Member

@theomonnom theomonnom Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, I got confused, so the rtc.AudioStream does indeed resample (the default is always 48000 & 1 channel).
But the rtc.AudioSource doesn't (and this is where we should introduce a new sample_rate field on the baseclass)

Copy link
Member

@theomonnom theomonnom Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can directly use the frames coming from the rtc.AudioStream and inject them to the PipelineAgent. We already automatically resample on the STT for this usecase (Tho it isn't the case for the realtime API that currently require 24000, I will create a ticket for it)

Comment on lines 39 to 42
audio_output = RoomAudioSink(ctx.room, sample_rate=24000, num_channels=1)

agent.input.audio = room_input.audio
agent.output.audio = audio_output
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like this to be consistent:

    room_output = RoomOutput(ctx.room, sample_rate=24000, num_channels=1)

    agent.input.audio = room_input.audio
    agent.output.audio = room_output.audio

Copy link
Collaborator Author

@longcw longcw Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is what I am thinking

return

# TODO: handle reconnected, do we need to cancel and re-publish? seems no
self._publication = await self._room.local_participant.publish_track(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reconnection, we indeed need to republish the track

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants