Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.0 #1364

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open

v1.0 #1364

wants to merge 16 commits into from

Conversation

theomonnom
Copy link
Member

No description provided.

Copy link

changeset-bot bot commented Jan 13, 2025

⚠️ No Changeset found

Latest commit: 1bbbdfd

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

data.generated_text += chunk
text_ch.send_nowait(chunk)

elif isinstance(chunk, ChatChunk):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe an else with a log or exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, good idea in case users give us random stuff

agent.start()

# start a chat inside the CLI
chat_cli = ChatCLI(agent)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed, let's save ChatCLI for later, until echo issues are resolved locally

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, this isn't done, there is no Room implementation yet

livekit-agents/livekit/agents/debug/index.html Outdated Show resolved Hide resolved
livekit-agents/livekit/agents/http_server.py Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would rename this package too, from multimodal to something like realtime

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

realtime.RealtimeModel?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.. I think the main difference between the models is:

  • HTTP based vs Websocket
  • Stateless vs stateful

so realtime seems like as good of a descriptor as any?

self._run_eou_detection(self._agent.chat_ctx)

def _run_eou_detection(self, chat_ctx: llm.ChatContext) -> None:
if not self._audio_transcript:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we've triggered an interruption during VAD start of speech, we will not be able to skip this. otherwise it will recreate the bug of agent becoming "stuck"

def __init__(self, video_changed: Callable, audio_changed: Callable) -> None:
self._video_stream: VideoStream | None = None
self._audio_stream: AudioStream | None = None
self._video_changed = video_changed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we keep our _cb suffix to indicate it's a callback?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's internal but we can yes

return self._sample_rate

@abstractmethod
async def capture_frame(self, frame: rtc.AudioFrame) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to keep our capture_frame terminology? or start switching to push_frame like we use elsewhere?


class TextSink(ABC):
@abstractmethod
async def capture_text(self, text: str) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to update existing text? thinking in the case we are pushing interim transcripts out

Copy link
Member Author

@theomonnom theomonnom Jan 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no we don't, this isn't for the transcript, this is for the llm output. Transcripts aren't going to be a stream. They're going to get exposed as events

"agent_stopped_speaking",
"user_message_committed",
"agent_message_committed",
"agent_message_interrupted",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to add metrics still?

what about these?
"function_calls_collected",
"function_calls_finished",

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this isn't done yet

livekit-agents/livekit/agents/utils/_message_change.py Outdated Show resolved Hide resolved
self.__capturing = False

@abstractmethod
def clear_buffer(self) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should flush and clear_buffer be also async?
for the avatar use case, it needs to call response = await room.local_participant.perform_rpc to notify the remote participant.

Copy link
Member Author

@theomonnom theomonnom Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the current implementation, we assume it happens instantly (so we're not waiting).
If we change those functions to async, we may have to add some more synchronization primitives where the agent uses it.

We don't have to be consistent, but as a note, TTS and STT doesn't have async methods too

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so is that ok for the case await room.local_participant.perform_rpc("interrupt_playback") in clear_buffer we use a create_task?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO It is OK, but we can discuss further in case we think the devex for ppl developing custom Sinks is bad

self._nested_speech_done_fut.set_result(None)
async def wait_until_interrupted(self, aw: list[Awaitable]) -> None:
await asyncio.wait(
[*aw, self._interrupt_fut], return_when=asyncio.FIRST_COMPLETED
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I testing in py3.11 it raises TypeError: Passing coroutines is forbidden, use tasks explicitly. (https://docs.python.org/3/library/asyncio-task.html#waiting-primitives)

perhaps the following is needed

    async def wait_until_interrupted(self, aw: list[Awaitable]) -> None:
        aw = [asyncio.create_task(task) for task in aw]
        await asyncio.wait(
            [*aw, self._interrupt_fut], return_when=asyncio.FIRST_COMPLETED
        )
        for task in aw:
            if not task.done():
                task.cancel()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or more complicated if there are tasks in aw

    async def wait_until_interrupted(self, aw: list[Awaitable]) -> None:
        temp_tasks = []
        tasks = []
        for task in aw:
            if not isinstance(task, asyncio.Task):
                task = asyncio.create_task(task)
                temp_tasks.append(task)
            tasks.append(task)

        await asyncio.wait([*tasks, self._interrupt_fut], return_when=asyncio.FIRST_COMPLETED)
        for task in temp_tasks:
            if not task.done():
                task.cancel()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants