Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added retry functionality and a new callback to the UserIdleProcessor #1081

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- Enhanced `UserIdleProcessor` with retry functionality and control over idle
monitoring via new callback signature `(processor, retry_count) -> bool`.
Updated the `17-detect-user-idle.py` to show how to use the `retry_count`.

- Modified `TranscriptProcessor` to use TTS text frames for more accurate assistant
transcripts. Assistant messages are now aggregated based on bot speaking boundaries
rather than LLM context, providing better handling of interruptions and partial
Expand Down
42 changes: 31 additions & 11 deletions examples/foundational/17-detect-user-idle.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from runner import configure

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.frames.frames import EndFrame, LLMMessagesFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -63,16 +63,36 @@ async def main():
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)

async def user_idle_callback(user_idle: UserIdleProcessor):
messages.append(
{
"role": "system",
"content": "Ask the user if they are still there and try to prompt for some input, but be short.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))

user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0)
async def handle_user_idle(user_idle: UserIdleProcessor, retry_count: int) -> bool:
if retry_count == 1:
# First attempt: Add a gentle prompt to the conversation
messages.append(
{
"role": "system",
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
return True
elif retry_count == 2:
# Second attempt: More direct prompt
messages.append(
{
"role": "system",
"content": "The user is still inactive. Ask if they'd like to continue our conversation.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
return True
else:
# Third attempt: End the conversation
await user_idle.push_frame(
TTSSpeakFrame("It seems like you're busy right now. Have a nice day!")
)
await task.queue_frame(EndFrame())
return False

user_idle = UserIdleProcessor(callback=handle_user_idle, timeout=5.0)

pipeline = Pipeline(
[
Expand Down
77 changes: 65 additions & 12 deletions src/pipecat/processors/user_idle_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#

import asyncio
from typing import Awaitable, Callable
from typing import Awaitable, Callable, Union

from pipecat.frames.frames import (
BotSpeakingFrame,
Expand All @@ -25,11 +25,23 @@ class UserIdleProcessor(FrameProcessor):
or BotSpeaking).
Args:
callback: Function to call when user is idle
callback: Function to call when user is idle. Can be either:
- Basic callback(processor) -> None
- Retry callback(processor, retry_count) -> bool
Return True to continue monitoring for idle events,
Return False to stop the idle monitoring task
timeout: Seconds to wait before considering user idle
**kwargs: Additional arguments passed to FrameProcessor
Example:
# Retry callback:
async def handle_idle(processor: "UserIdleProcessor", retry_count: int) -> bool:
if retry_count < 3:
await send_reminder("Are you still there?")
return True
return False
# Basic callback:
async def handle_idle(processor: "UserIdleProcessor") -> None:
await send_reminder("Are you still there?")
Expand All @@ -42,24 +54,60 @@ async def handle_idle(processor: "UserIdleProcessor") -> None:
def __init__(
self,
*,
callback: Callable[["UserIdleProcessor"], Awaitable[None]],
callback: Union[
Callable[["UserIdleProcessor"], Awaitable[None]], # Basic
Callable[["UserIdleProcessor", int], Awaitable[bool]], # Retry
],
timeout: float,
**kwargs,
):
super().__init__(**kwargs)
self._callback = callback
self._callback = self._wrap_callback(callback)
self._timeout = timeout
self._retry_count = 0
self._interrupted = False
self._conversation_started = False
self._idle_task = None
self._idle_event = asyncio.Event()

def _create_idle_task(self):
"""Create the idle task if it hasn't been created yet."""
def _wrap_callback(
self,
callback: Union[
Callable[["UserIdleProcessor"], Awaitable[None]],
Callable[["UserIdleProcessor", int], Awaitable[bool]],
],
) -> Callable[["UserIdleProcessor", int], Awaitable[bool]]:
"""Wraps callback to support both basic and retry signatures.
Args:
callback: The callback function to wrap.
Returns:
A wrapped callback that returns bool to indicate whether to continue monitoring.
"""

async def wrapper(processor: "UserIdleProcessor", retry_count: int) -> bool:
if len(callback.__code__.co_varnames) == 1:
# Basic callback
await callback(processor) # type: ignore
return True
else:
# Retry callback
return await callback(processor, retry_count) # type: ignore

return wrapper

def _create_idle_task(self) -> None:
"""Creates the idle task if it hasn't been created yet."""
if self._idle_task is None:
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())

async def _stop(self):
@property
def retry_count(self) -> int:
"""Returns the current retry count."""
return self._retry_count

async def _stop(self) -> None:
"""Stops and cleans up the idle monitoring task."""
if self._idle_task is not None:
self._idle_task.cancel()
Expand All @@ -69,7 +117,7 @@ async def _stop(self):
pass # Expected when task is cancelled
self._idle_task = None

async def process_frame(self, frame: Frame, direction: FrameDirection):
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
"""Processes incoming frames and manages idle monitoring state.
Args:
Expand Down Expand Up @@ -98,6 +146,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
if self._conversation_started:
# We shouldn't call the idle callback if the user or the bot are speaking
if isinstance(frame, UserStartedSpeakingFrame):
self._retry_count = 0 # Reset retry count when user speaks
self._interrupted = True
self._idle_event.set()
elif isinstance(frame, UserStoppedSpeakingFrame):
Expand All @@ -106,22 +155,26 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
elif isinstance(frame, BotSpeakingFrame):
self._idle_event.set()

async def cleanup(self):
async def cleanup(self) -> None:
"""Cleans up resources when processor is shutting down."""
if self._idle_task: # Only stop if task exists
await self._stop()

async def _idle_task_handler(self):
async def _idle_task_handler(self) -> None:
"""Monitors for idle timeout and triggers callbacks.
Runs in a loop until cancelled.
Runs in a loop until cancelled or callback indicates completion.
"""
while True:
try:
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
except asyncio.TimeoutError:
if not self._interrupted:
await self._callback(self)
self._retry_count += 1
should_continue = await self._callback(self, self._retry_count)
if not should_continue:
await self._stop()
break
except asyncio.CancelledError:
break
finally:
Expand Down
Loading