Skip to content

Commit

Permalink
Merge pull request #75 from pipecat-ai/mb/fix-function-call-timing
Browse files Browse the repository at this point in the history
Improve LLM completion timing for edge and node functions
  • Loading branch information
markbackman authored Jan 19, 2025
2 parents 3639a83 + 2725fb4 commit e1aca2c
Show file tree
Hide file tree
Showing 15 changed files with 458 additions and 140 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@ All notable changes to **Pipecat Flows** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Changed

- Updated `FlowManager` to more predictably handle function calls:

- Edge functions (which transition to a new node) now result in an LLM
completion after both the function call and messages are added to the
LLM's context.
- Node functions (which execute a function call without transitioning nodes)
result in an LLM completion upon the function call result returning.
- This change also improves the reliability of the pre- and post-action
execution timing.
- Note: the FlowManager has a new required arg, `context_aggregator`.

- Updated all examples to align with the new changes.

## [0.0.10] - 2024-12-21

### Changed
Expand Down
29 changes: 26 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ Here's a basic example of setting up a static conversation flow:
from pipecat_flows import FlowManager

# Initialize flow manager with static configuration
flow_manager = FlowManager(task, llm, tts, flow_config=flow_config)
flow_manager = FlowManager(
task=task,
llm=llm,
context_aggregator=context_aggregator,
tts=tts,
flow_config=flow_config,
)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
Expand Down Expand Up @@ -127,6 +133,11 @@ Functions come in two types:
}
```

Functions behave differently based on their type:

Node Functions execute their handler and trigger an immediate LLM completion with the result
Edge Functions execute their handler (if any) and transition to a new node, with the LLM completion occurring after both the function result and new node's messages are added to context

Functions can:

- Have a handler (for data processing)
Expand Down Expand Up @@ -235,7 +246,13 @@ flow_config = {
}

# Create and initialize the FlowManager
flow_manager = FlowManager(task, llm, tts, flow_config=flow_config)
flow_manager = FlowManager(
task=task,
llm=llm,
context_aggregator=context_aggregator,
tts=tts,
flow_config=flow_config,
)
await flow_manager.initialize()
```

Expand Down Expand Up @@ -276,7 +293,13 @@ def create_initial_node() -> NodeConfig:
}

# Initialize with transition callback
flow_manager = FlowManager(task, llm, tts, transition_callback=handle_transitions)
flow_manager = FlowManager(
task=task,
llm=llm,
context_aggregator=context_aggregator,
tts=tts,
transition_callback=handle_transition,
)
await flow_manager.initialize()

@transport.event_handler("on_first_participant_joined")
Expand Down
7 changes: 5 additions & 2 deletions examples/dynamic/insurance_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,11 @@ async def main():

# Initialize flow manager with transition callback
flow_manager = FlowManager(
task=task, llm=llm, tts=tts, transition_callback=handle_insurance_transition
task=task,
llm=llm,
context_aggregator=context_aggregator,
tts=tts,
transition_callback=handle_insurance_transition,
)

@transport.event_handler("on_first_participant_joined")
Expand All @@ -439,7 +443,6 @@ async def on_first_participant_joined(transport, participant):
await flow_manager.initialize()
# Set initial node
await flow_manager.set_node("initial", create_initial_node())
await task.queue_frames([context_aggregator.user().get_context_frame()])

# Run the pipeline
runner = PipelineRunner()
Expand Down
7 changes: 5 additions & 2 deletions examples/dynamic/insurance_gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,11 @@ async def main():

# Initialize flow manager with transition callback
flow_manager = FlowManager(
task=task, llm=llm, tts=tts, transition_callback=handle_insurance_transition
task=task,
llm=llm,
context_aggregator=context_aggregator,
tts=tts,
transition_callback=handle_insurance_transition,
)

@transport.event_handler("on_first_participant_joined")
Expand All @@ -419,7 +423,6 @@ async def on_first_participant_joined(transport, participant):
await flow_manager.initialize()
# Set initial node
await flow_manager.set_node("initial", create_initial_node())
await task.queue_frames([context_aggregator.user().get_context_frame()])

# Run the pipeline
runner = PipelineRunner()
Expand Down
8 changes: 5 additions & 3 deletions examples/dynamic/insurance_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,11 @@ async def main():

# Initialize flow manager with transition callback
flow_manager = FlowManager(
task=task, llm=llm, tts=tts, transition_callback=handle_insurance_transition
task=task,
llm=llm,
context_aggregator=context_aggregator,
tts=tts,
transition_callback=handle_insurance_transition,
)

@transport.event_handler("on_first_participant_joined")
Expand All @@ -413,8 +417,6 @@ async def on_first_participant_joined(transport, participant):
await flow_manager.initialize()
logger.debug("Setting initial node")
await flow_manager.set_node("initial", create_initial_node())
logger.debug("Queueing initial context")
await task.queue_frames([context_aggregator.user().get_context_frame()])

# Run the pipeline
runner = PipelineRunner()
Expand Down
47 changes: 21 additions & 26 deletions examples/static/food_ordering.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,13 @@ async def select_sushi_order(args: FlowArgs) -> SushiOrderResult:
"role": "system",
"content": """You are handling a pizza order. Use the available functions:
- Use select_pizza_order when the user specifies both size AND type
- Use confirm_order when the user confirms they are satisfied with their selection
Pricing:
- Small: $10
- Medium: $15
- Large: $20
After selection, confirm both the size and type, state the price, and ask if they want to confirm their order. Remember to be friendly and casual.""",
Remember to be friendly and casual.""",
}
],
"functions": [
Expand All @@ -184,14 +183,6 @@ async def select_sushi_order(args: FlowArgs) -> SushiOrderResult:
},
"required": ["size", "type"],
},
},
},
{
"type": "function",
"function": {
"name": "confirm_order",
"description": "Proceed to order confirmation",
"parameters": {"type": "object", "properties": {}},
"transition_to": "confirm",
},
},
Expand All @@ -203,12 +194,11 @@ async def select_sushi_order(args: FlowArgs) -> SushiOrderResult:
"role": "system",
"content": """You are handling a sushi order. Use the available functions:
- Use select_sushi_order when the user specifies both count AND type
- Use confirm_order when the user confirms they are satisfied with their selection
Pricing:
- $8 per roll
After selection, confirm both the count and type, state the price, and ask if they want to confirm their order. Remember to be friendly and casual.""",
Remember to be friendly and casual.""",
}
],
"functions": [
Expand All @@ -235,14 +225,6 @@ async def select_sushi_order(args: FlowArgs) -> SushiOrderResult:
},
"required": ["count", "type"],
},
},
},
{
"type": "function",
"function": {
"name": "confirm_order",
"description": "Proceed to order confirmation",
"parameters": {"type": "object", "properties": {}},
"transition_to": "confirm",
},
},
Expand All @@ -252,8 +234,8 @@ async def select_sushi_order(args: FlowArgs) -> SushiOrderResult:
"task_messages": [
{
"role": "system",
"content": """Read back the complete order details to the user and ask for final confirmation. Use the available functions:
- Use complete_order when the user confirms
"content": """Read back the complete order details to the user and if they want anything else or if they want to make changes. Use the available functions:
- Use complete_order when the user confirms that the order is correct and no changes are needed
- Use revise_order if they want to change something
Be friendly and clear when reading back the order details.""",
Expand All @@ -269,13 +251,22 @@ async def select_sushi_order(args: FlowArgs) -> SushiOrderResult:
"transition_to": "end",
},
},
{
"type": "function",
"function": {
"name": "revise_order",
"description": "User wants to make changes to their order",
"parameters": {"type": "object", "properties": {}},
"transition_to": "start",
},
},
],
},
"end": {
"task_messages": [
{
"role": "system",
"content": "Concisely end the conversation—1-3 words is appropriate. Just say 'Bye' or something similarly short.",
"content": "Thank the user for their order and end the conversation politely and concisely.",
}
],
"functions": [],
Expand Down Expand Up @@ -330,15 +321,19 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager in static mode
flow_manager = FlowManager(task=task, llm=llm, tts=tts, flow_config=flow_config)
flow_manager = FlowManager(
task=task,
llm=llm,
context_aggregator=context_aggregator,
tts=tts,
flow_config=flow_config,
)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
logger.debug("Initializing flow")
await flow_manager.initialize()
logger.debug("Starting conversation")
await task.queue_frames([context_aggregator.user().get_context_frame()])

runner = PipelineRunner()
await runner.run(task)
Expand Down
9 changes: 7 additions & 2 deletions examples/static/movie_explorer_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,13 +466,18 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager
flow_manager = FlowManager(task=task, llm=llm, tts=tts, flow_config=flow_config)
flow_manager = FlowManager(
task=task,
llm=llm,
context_aggregator=context_aggregator,
tts=tts,
flow_config=flow_config,
)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
await flow_manager.initialize()
await task.queue_frames([context_aggregator.user().get_context_frame()])

runner = PipelineRunner()
await runner.run(task)
Expand Down
9 changes: 7 additions & 2 deletions examples/static/movie_explorer_gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,13 +453,18 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager
flow_manager = FlowManager(task=task, llm=llm, tts=tts, flow_config=flow_config)
flow_manager = FlowManager(
task=task,
llm=llm,
context_aggregator=context_aggregator,
tts=tts,
flow_config=flow_config,
)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
await flow_manager.initialize()
await task.queue_frames([context_aggregator.user().get_context_frame()])

runner = PipelineRunner()
await runner.run(task)
Expand Down
9 changes: 7 additions & 2 deletions examples/static/movie_explorer_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,13 +470,18 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager
flow_manager = FlowManager(task=task, llm=llm, tts=tts, flow_config=flow_config)
flow_manager = FlowManager(
task=task,
llm=llm,
context_aggregator=context_aggregator,
tts=tts,
flow_config=flow_config,
)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
await flow_manager.initialize()
await task.queue_frames([context_aggregator.user().get_context_frame()])

runner = PipelineRunner()
await runner.run(task)
Expand Down
10 changes: 7 additions & 3 deletions examples/static/patient_intake.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,15 +465,19 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager with LLM
flow_manager = FlowManager(task=task, llm=llm, tts=tts, flow_config=flow_config)
flow_manager = FlowManager(
task=task,
llm=llm,
context_aggregator=context_aggregator,
tts=tts,
flow_config=flow_config,
)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Initialize the flow processor
await flow_manager.initialize()
# Kick off the conversation using the context aggregator
await task.queue_frames([context_aggregator.user().get_context_frame()])

runner = PipelineRunner()
await runner.run(task)
Expand Down
10 changes: 7 additions & 3 deletions examples/static/restaurant_reservation.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,15 +225,19 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager with LLM
flow_manager = FlowManager(task=task, llm=llm, tts=tts, flow_config=flow_config)
flow_manager = FlowManager(
task=task,
llm=llm,
context_aggregator=context_aggregator,
tts=tts,
flow_config=flow_config,
)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Initialize the flow processor
await flow_manager.initialize()
# Kick off the conversation using the context aggregator
await task.queue_frames([context_aggregator.user().get_context_frame()])

runner = PipelineRunner()
await runner.run(task)
Expand Down
10 changes: 7 additions & 3 deletions examples/static/travel_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,15 +388,19 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

# Initialize flow manager with LLM
flow_manager = FlowManager(task=task, llm=llm, tts=tts, flow_config=flow_config)
flow_manager = FlowManager(
task=task,
llm=llm,
context_aggregator=context_aggregator,
tts=tts,
flow_config=flow_config,
)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Initialize the flow processor
await flow_manager.initialize()
# Kick off the conversation using the context aggregator
await task.queue_frames([context_aggregator.user().get_context_frame()])

runner = PipelineRunner()
await runner.run(task)
Expand Down
Loading

0 comments on commit e1aca2c

Please sign in to comment.