From 5f9671e2ca1a855805b00b09c98d3204e8dba322 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Fri, 24 Jan 2025 09:57:52 -0500 Subject: [PATCH 1/2] Added retry functionality and a new callback to the UserIdleProcessor --- CHANGELOG.md | 6 +- examples/foundational/17-detect-user-idle.py | 42 +++++++--- src/pipecat/processors/user_idle_processor.py | 77 ++++++++++++++++--- 3 files changed, 101 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 39c2a070f..ff51f936b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - It is now possible to specify the period of the `PipelineTask` heartbeat frames with `heartbeats_period_secs`. -- Added `DailyMeetingTokenProperties` and `DailyMeetingTokenParams` Pydantic models +- Added `DailyMeetingTokenProperties` and `DailyMeetingTokenParams` Pydantic models for meeting token creation in `get_token` method of `DailyRESTHelper`. - Added `enable_recording` and `geo` parameters to `DailyRoomProperties`. @@ -21,6 +21,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Enhanced `UserIdleProcessor` with retry functionality and control over idle + monitoring via new callback signature `(processor, retry_count) -> bool`. + Updated the `17-detect-user-idle.py` to show how to use the `retry_count`. + - Add defensive error handling for `OpenAIRealtimeBetaLLMService`'s audio truncation. Audio truncation errors during interruptions now log a warning and allow the session to continue instead of throwing an exception. diff --git a/examples/foundational/17-detect-user-idle.py b/examples/foundational/17-detect-user-idle.py index aa94d2b60..e0af16c92 100644 --- a/examples/foundational/17-detect-user-idle.py +++ b/examples/foundational/17-detect-user-idle.py @@ -14,7 +14,7 @@ from runner import configure from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import LLMMessagesFrame +from pipecat.frames.frames import EndFrame, LLMMessagesFrame, TTSSpeakFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -63,16 +63,36 @@ async def main(): context = OpenAILLMContext(messages) context_aggregator = llm.create_context_aggregator(context) - async def user_idle_callback(user_idle: UserIdleProcessor): - messages.append( - { - "role": "system", - "content": "Ask the user if they are still there and try to prompt for some input, but be short.", - } - ) - await user_idle.push_frame(LLMMessagesFrame(messages)) - - user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0) + async def handle_user_idle(user_idle: UserIdleProcessor, retry_count: int) -> bool: + if retry_count == 1: + # First attempt: Add a gentle prompt to the conversation + messages.append( + { + "role": "system", + "content": "The user has been quiet. Politely and briefly ask if they're still there.", + } + ) + await user_idle.push_frame(LLMMessagesFrame(messages)) + return True + elif retry_count == 2: + # Second attempt: More direct prompt + messages.append( + { + "role": "system", + "content": "The user is still inactive. Ask if they'd like to continue our conversation.", + } + ) + await user_idle.push_frame(LLMMessagesFrame(messages)) + return True + else: + # Third attempt: End the conversation + await user_idle.push_frame( + TTSSpeakFrame("It seems like you're busy right now. Have a nice day!") + ) + await task.queue_frame(EndFrame()) + return False + + user_idle = UserIdleProcessor(callback=handle_user_idle, timeout=5.0) pipeline = Pipeline( [ diff --git a/src/pipecat/processors/user_idle_processor.py b/src/pipecat/processors/user_idle_processor.py index 3a7202c80..9339e2481 100644 --- a/src/pipecat/processors/user_idle_processor.py +++ b/src/pipecat/processors/user_idle_processor.py @@ -5,7 +5,7 @@ # import asyncio -from typing import Awaitable, Callable +from typing import Awaitable, Callable, Union from pipecat.frames.frames import ( BotSpeakingFrame, @@ -25,11 +25,23 @@ class UserIdleProcessor(FrameProcessor): or BotSpeaking). Args: - callback: Function to call when user is idle + callback: Function to call when user is idle. Can be either: + - Basic callback(processor) -> None + - Retry callback(processor, retry_count) -> bool + Return True to continue monitoring for idle events, + Return False to stop the idle monitoring task timeout: Seconds to wait before considering user idle **kwargs: Additional arguments passed to FrameProcessor Example: + # Retry callback: + async def handle_idle(processor: "UserIdleProcessor", retry_count: int) -> bool: + if retry_count < 3: + await send_reminder("Are you still there?") + return True + return False + + # Basic callback: async def handle_idle(processor: "UserIdleProcessor") -> None: await send_reminder("Are you still there?") @@ -42,24 +54,60 @@ async def handle_idle(processor: "UserIdleProcessor") -> None: def __init__( self, *, - callback: Callable[["UserIdleProcessor"], Awaitable[None]], + callback: Union[ + Callable[["UserIdleProcessor"], Awaitable[None]], # Basic + Callable[["UserIdleProcessor", int], Awaitable[bool]], # Retry + ], timeout: float, **kwargs, ): super().__init__(**kwargs) - self._callback = callback + self._callback = self._wrap_callback(callback) self._timeout = timeout + self._retry_count = 0 self._interrupted = False self._conversation_started = False self._idle_task = None self._idle_event = asyncio.Event() - def _create_idle_task(self): - """Create the idle task if it hasn't been created yet.""" + def _wrap_callback( + self, + callback: Union[ + Callable[["UserIdleProcessor"], Awaitable[None]], + Callable[["UserIdleProcessor", int], Awaitable[bool]], + ], + ) -> Callable[["UserIdleProcessor", int], Awaitable[bool]]: + """Wraps callback to support both basic and retry signatures. + + Args: + callback: The callback function to wrap. + + Returns: + A wrapped callback that returns bool to indicate whether to continue monitoring. + """ + + async def wrapper(processor: "UserIdleProcessor", retry_count: int) -> bool: + if len(callback.__code__.co_varnames) == 1: + # Basic callback + await callback(processor) # type: ignore + return True + else: + # Retry callback + return await callback(processor, retry_count) # type: ignore + + return wrapper + + def _create_idle_task(self) -> None: + """Creates the idle task if it hasn't been created yet.""" if self._idle_task is None: self._idle_task = self.get_event_loop().create_task(self._idle_task_handler()) - async def _stop(self): + @property + def retry_count(self) -> int: + """Returns the current retry count.""" + return self._retry_count + + async def _stop(self) -> None: """Stops and cleans up the idle monitoring task.""" if self._idle_task is not None: self._idle_task.cancel() @@ -69,7 +117,7 @@ async def _stop(self): pass # Expected when task is cancelled self._idle_task = None - async def process_frame(self, frame: Frame, direction: FrameDirection): + async def process_frame(self, frame: Frame, direction: FrameDirection) -> None: """Processes incoming frames and manages idle monitoring state. Args: @@ -98,6 +146,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): if self._conversation_started: # We shouldn't call the idle callback if the user or the bot are speaking if isinstance(frame, UserStartedSpeakingFrame): + self._retry_count = 0 # Reset retry count when user speaks self._interrupted = True self._idle_event.set() elif isinstance(frame, UserStoppedSpeakingFrame): @@ -106,22 +155,26 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): elif isinstance(frame, BotSpeakingFrame): self._idle_event.set() - async def cleanup(self): + async def cleanup(self) -> None: """Cleans up resources when processor is shutting down.""" if self._idle_task: # Only stop if task exists await self._stop() - async def _idle_task_handler(self): + async def _idle_task_handler(self) -> None: """Monitors for idle timeout and triggers callbacks. - Runs in a loop until cancelled. + Runs in a loop until cancelled or callback indicates completion. """ while True: try: await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout) except asyncio.TimeoutError: if not self._interrupted: - await self._callback(self) + self._retry_count += 1 + should_continue = await self._callback(self, self._retry_count) + if not should_continue: + await self._stop() + break except asyncio.CancelledError: break finally: From 9e9822f17d44591965029a5a1483055fa266b662 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 27 Jan 2025 10:23:26 -0500 Subject: [PATCH 2/2] Use inspect.signature to determine which callback to use --- src/pipecat/processors/user_idle_processor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/pipecat/processors/user_idle_processor.py b/src/pipecat/processors/user_idle_processor.py index 9339e2481..7c2f1c763 100644 --- a/src/pipecat/processors/user_idle_processor.py +++ b/src/pipecat/processors/user_idle_processor.py @@ -5,6 +5,7 @@ # import asyncio +import inspect from typing import Awaitable, Callable, Union from pipecat.frames.frames import ( @@ -85,9 +86,11 @@ def _wrap_callback( Returns: A wrapped callback that returns bool to indicate whether to continue monitoring. """ + sig = inspect.signature(callback) + param_count = len(sig.parameters) async def wrapper(processor: "UserIdleProcessor", retry_count: int) -> bool: - if len(callback.__code__.co_varnames) == 1: + if param_count == 1: # Basic callback await callback(processor) # type: ignore return True