Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.0 #1364

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
15 changes: 11 additions & 4 deletions examples/minimal_worker.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import logging

from dotenv import load_dotenv
from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, WorkerType, cli
from livekit.agents.pipeline import ChatCLI, PipelineAgent
from livekit.plugins import deepgram, openai, cartesia

logger = logging.getLogger("my-worker")
logger.setLevel(logging.INFO)

load_dotenv()

async def entrypoint(ctx: JobContext):
logger.info("starting entrypoint")

async def entrypoint(ctx: JobContext):
await ctx.connect(auto_subscribe=AutoSubscribe.SUBSCRIBE_ALL)

logger.info("connected to the room")
# add your agent logic here!
agent = PipelineAgent(llm=openai.LLM(), stt=deepgram.STT(), tts=cartesia.TTS())
agent.start()

# start a chat inside the CLI
chat_cli = ChatCLI(agent)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed, let's save ChatCLI for later, until echo issues are resolved locally

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, this isn't done, there is no Room implementation yet

await chat_cli.run()


if __name__ == "__main__":
Expand Down
2 changes: 0 additions & 2 deletions livekit-agents/livekit/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
tts,
utils,
vad,
voice_assistant,
)
from ._exceptions import (
APIConnectionError,
Expand Down Expand Up @@ -72,7 +71,6 @@
"transcription",
"pipeline",
"multimodal",
"voice_assistant",
"cli",
"AssignmentTimeoutError",
"APIConnectionError",
Expand Down
145 changes: 97 additions & 48 deletions livekit-agents/livekit/agents/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
import pathlib
import signal
import sys
import threading

import click
from livekit.protocol import models

from .. import utils
from ..log import logger
from ..plugin import Plugin
from ..worker import Worker, WorkerOptions
from ..worker import JobExecutorType, Worker, WorkerOptions
from . import proto
from .log import setup_logging

Expand Down Expand Up @@ -58,6 +58,7 @@ def start(
log_level=log_level,
devmode=False,
asyncio_debug=False,
register=True,
watch=False,
drain_timeout=drain_timeout,
)
Expand Down Expand Up @@ -105,7 +106,63 @@ def dev(
asyncio_debug: bool,
watch: bool,
) -> None:
_run_dev(opts, log_level, url, api_key, api_secret, asyncio_debug, watch)
opts.ws_url = url or opts.ws_url
opts.api_key = api_key or opts.api_key
opts.api_secret = api_secret or opts.api_secret
args = proto.CliArgs(
opts=opts,
log_level=log_level,
devmode=True,
asyncio_debug=asyncio_debug,
watch=watch,
drain_timeout=0,
register=False,
)

_run_dev(args)

@cli.command(help="Start a new chat")
@click.option(
"--url",
envvar="LIVEKIT_URL",
help="LiveKit server or Cloud project's websocket URL",
)
@click.option(
"--api-key",
envvar="LIVEKIT_API_KEY",
help="LiveKit server or Cloud project's API key",
)
@click.option(
"--api-secret",
envvar="LIVEKIT_API_SECRET",
help="LiveKit server or Cloud project's API secret",
)
def chat(
url: str,
api_key: str,
api_secret: str,
) -> None:
# keep everything inside the same process when using the chat mode
opts.job_executor_type = JobExecutorType.THREAD
opts.ws_url = url or opts.ws_url
opts.api_key = api_key or opts.api_key
opts.api_secret = api_secret or opts.api_secret

chat_name = utils.shortuuid("chat_cli_")

args = proto.CliArgs(
opts=opts,
log_level="ERROR",
devmode=True,
asyncio_debug=False,
watch=False,
drain_timeout=0,
register=False,
simulate_job=proto.SimulateJobArgs(
room=chat_name,
),
)
_run_dev(args)

@cli.command(help="Connect to a specific room")
@click.option(
Expand Down Expand Up @@ -155,18 +212,25 @@ def connect(
room: str,
participant_identity: str,
) -> None:
_run_dev(
opts,
log_level,
url,
api_key,
api_secret,
asyncio_debug,
watch,
room,
participant_identity,
opts.ws_url = url or opts.ws_url
opts.api_key = api_key or opts.api_key
opts.api_secret = api_secret or opts.api_secret
args = proto.CliArgs(
opts=opts,
log_level=log_level,
devmode=True,
register=False,
asyncio_debug=asyncio_debug,
watch=watch,
drain_timeout=0,
simulate_job=proto.SimulateJobArgs(
room=room,
participant_identity=participant_identity,
),
)

_run_dev(args)

@cli.command(help="Download plugin dependency files")
@click.option(
"--log-level",
Expand All @@ -188,34 +252,12 @@ def download_files(log_level: str) -> None:


def _run_dev(
opts: WorkerOptions,
log_level: str,
url: str,
api_key: str,
api_secret: str,
asyncio_debug: bool,
watch: bool,
room: str = "",
participant_identity: str = "",
args: proto.CliArgs,
):
opts.ws_url = url or opts.ws_url
opts.api_key = api_key or opts.api_key
opts.api_secret = api_secret or opts.api_secret
args = proto.CliArgs(
opts=opts,
log_level=log_level,
devmode=True,
asyncio_debug=asyncio_debug,
watch=watch,
drain_timeout=0,
room=room,
participant_identity=participant_identity,
)

if watch:
if args.watch:
from .watcher import WatchServer

setup_logging(log_level, args.devmode)
setup_logging(args.log_level, args.devmode)
main_file = pathlib.Path(sys.argv[0]).parent

async def _run_loop():
Expand All @@ -236,27 +278,34 @@ def run_worker(args: proto.CliArgs) -> None:
setup_logging(args.log_level, args.devmode)
args.opts.validate_config(args.devmode)

loop = asyncio.get_event_loop()
worker = Worker(args.opts, devmode=args.devmode, loop=loop)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

worker = Worker(args.opts, devmode=args.devmode, register=args.register, loop=loop)

loop.set_debug(args.asyncio_debug)
loop.slow_callback_duration = 0.1 # 100ms
utils.aio.debug.hook_slow_callbacks(2)

if args.room and args.reload_count == 0:
# directly connect to a specific room
@worker.once("worker_registered")
def _connect_on_register(worker_id: str, server_info: models.ServerInfo):
logger.info("connecting to room %s", args.room)
loop.create_task(worker.simulate_job(args.room, args.participant_identity))
@worker.once("worker_started")
def _worker_started():
if args.simulate_job and args.reload_count == 0:
logger.info("connecting to room %s", args.simulate_job.room)
loop.create_task(
worker.simulate_job(
args.simulate_job.room, args.simulate_job.participant_identity
)
)

try:

def _signal_handler():
raise KeyboardInterrupt

for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, _signal_handler)
if threading.current_thread() is threading.main_thread():
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, _signal_handler)

except NotImplementedError:
# TODO(theomonnom): add_signal_handler is not implemented on win
pass
Expand Down
13 changes: 11 additions & 2 deletions livekit-agents/livekit/agents/cli/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
from ..worker import WorkerOptions


@dataclass
class SimulateJobArgs:
room: str = ""
participant_identity: str = ""


@dataclass
class CliArgs:
opts: WorkerOptions
Expand All @@ -20,8 +26,11 @@ class CliArgs:
asyncio_debug: bool
watch: bool
drain_timeout: int
room: str = ""
participant_identity: str = ""

# register the worker to the worker pool
register: bool = True

simulate_job: SimulateJobArgs | None = None

# amount of time this worker has been reloaded
reload_count: int = 0
Expand Down
8 changes: 8 additions & 0 deletions livekit-agents/livekit/agents/debug/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .tracing import Tracing, TracingGraph, TracingHandle


__all__ = [
"Tracing",
"TracingGraph",
"TracingHandle",
]
Loading