Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update dynamic flows to use a per-function transition handler, support node functions #82

Merged
merged 8 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,39 @@ All notable changes to **Pipecat Flows** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Changed

- Updated dynamic flows to use per-function transition handlers:
- Removed global `transition_callback` in favor of `transition_callbacks`
dict
- Each edge function now explicitly registers its transition handler
- Clearer distinction between node functions (LLM completion) and edge
functions (transitions)
- Better type safety and error detection for transition handlers
- Breaking change: Dynamic flows must now use `transition_callbacks` instead
of `transition_callback`

Example of the new pattern:

```python
# Before - global router
flow_manager = FlowManager(
transition_callback=handle_transition
)

# After - per-function transitions
flow_manager = FlowManager(
transition_callbacks={

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an alternative, I would consider specifying the per-function transition handler directly in the function definition code, similar to how static flow management specifies the "transition_to" field.

For example:

def create_marital_status_node() -> NodeConfig:
    """Create node for collecting marital status."""
    return {
        "task_messages": [
            {
                "role": "system",
                "content": "Ask about the customer's marital status for premium calculation.",
            }
        ],
        "functions": [
            {
                "type": "function",
                "function": {
                    "name": "collect_marital_status",
                    "handler": collect_marital_status,
                    "description": "Record marital status",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "marital_status": {"type": "string", "enum": ["single", "married"]}
                        },
                        "required": ["marital_status"],
                    },
                },
                "transition_callback": handle_marital_status_collection,
            }
        ],
    }

I prefer this approach because it is more consistent with the static flow management style.

Copy link
Contributor Author

@markbackman markbackman Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm considering this. I'm going to merge this change, then I'll open a new PR to see how making the change you suggested looks.

"collect_age": handle_age_collection,
"collect_status": handle_status_collection
}
)
```

- Updated all dynamic flow examples to use the new transition handler pattern

## [0.0.11] - 2025-01-19

### Changed
Expand Down
37 changes: 33 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ Functions come in two types:

2. **Edge Functions**: Create transitions between states

Static flows use `transition_to`:

```python
{
"type": "function",
Expand All @@ -132,15 +134,27 @@ Functions come in two types:
}
```

Dynamic flows use registered transition handlers:

```python
# Register transition handlers
flow_manager = FlowManager(
transition_callbacks={
"collect_age": handle_age_collection,
"collect_status": handle_status_collection
}
)
```

Functions behave differently based on their type:

Node Functions execute their handler and trigger an immediate LLM completion with the result
Edge Functions execute their handler (if any) and transition to a new node, with the LLM completion occurring after both the function result and new node's messages are added to context
- Node Functions execute their handler and trigger an immediate LLM completion with the result
- Edge Functions execute their handler (if provided) and transition to a new node, with the LLM completion occurring after both the function result and new node's messages are added to context

Functions can:

- Have a handler (for data processing)
- Have a transition_to (for state changes)
- Have a transition_to or transition callback (for state changes)
- Have both (process data and transition)
- Have neither (end node functions)

Expand Down Expand Up @@ -258,6 +272,7 @@ await flow_manager.initialize()
#### Dynamic Flows

```python
# Create nodes
def create_initial_node() -> NodeConfig:
return {
"role_messages": [
Expand Down Expand Up @@ -291,13 +306,27 @@ def create_initial_node() -> NodeConfig:
]
}

# Define handlers
async def update_coverage(args: FlowArgs) -> FlowResult:
"""Update coverage options; node function without a transition."""
return {"coverage": args["coverage"]}

# Edge function - handles transition
async def handle_age_collection(args: Dict, flow_manager: FlowManager):
"""Handle age collection transition; edge function which transitions to the next node."""
flow_manager.state["age"] = args["age"]
await flow_manager.set_node("next", create_next_node())

# Initialize with transition callback
flow_manager = FlowManager(
task=task,
llm=llm,
context_aggregator=context_aggregator,
tts=tts,
transition_callback=handle_transition,
transition_callbacks={
"collect_age": handle_age_collection,
}
,
)
await flow_manager.initialize()

Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ pip-tools~=7.4.1
pytest~=8.3.2
pytest-asyncio~=0.23.5
pytest-cov~=4.1.0
ruff~=0.6.7
ruff~=0.9.1
setuptools~=72.2.0
python-dotenv~=1.0.1
51 changes: 20 additions & 31 deletions examples/dynamic/insurance_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""
Insurance Quote Example using Pipecat Dynamic Flows with Anthropic Claude
"""Insurance Quote Example using Pipecat Dynamic Flows with Anthropic Claude.

This example demonstrates how to create a conversational insurance quote bot using:
- Dynamic flow management for flexible conversation paths
Expand Down Expand Up @@ -44,11 +43,11 @@
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport

from pipecat_flows import FlowArgs, FlowManager, FlowResult, NodeConfig

sys.path.append(str(Path(__file__).parent.parent))
from runner import configure

from pipecat_flows import FlowArgs, FlowManager, FlowResult, NodeConfig

load_dotenv(override=True)

logger.remove(0)
Expand Down Expand Up @@ -92,14 +91,14 @@ async def collect_age(args: FlowArgs) -> AgeCollectionResult:
"""Process age collection."""
age = args["age"]
logger.debug(f"collect_age handler executing with age: {age}")
return {"age": age}
return AgeCollectionResult(age=age)


async def collect_marital_status(args: FlowArgs) -> MaritalStatusResult:
"""Process marital status collection."""
status = args["marital_status"]
logger.debug(f"collect_marital_status handler executing with status: {status}")
return {"marital_status": status}
return MaritalStatusResult(marital_status=status)


async def calculate_quote(args: FlowArgs) -> QuoteCalculationResult:
Expand Down Expand Up @@ -281,9 +280,11 @@ def create_quote_results_node(
f"Monthly Premium: ${quote['monthly_premium']:.2f}\n"
f"Coverage Amount: ${quote['coverage_amount']:,}\n"
f"Deductible: ${quote['deductible']:,}\n\n"
"Explain these quote details to the customer. "
"Ask if they would like to adjust the coverage amount or deductible. "
"They can also end the quote process if they're satisfied."
"Explain these quote details to the customer. When they request changes, "
"use update_coverage to recalculate their quote. Explain how their "
"changes affected the premium and compare it to their previous quote. "
"Ask if they'd like to make any other adjustments or if they're ready "
"to end the quote process."
),
}
],
Expand All @@ -293,7 +294,7 @@ def create_quote_results_node(
{
"name": "update_coverage",
"handler": update_coverage,
"description": "Update coverage options",
"description": "Recalculate quote with new coverage options",
"input_schema": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -347,43 +348,26 @@ async def handle_age_collection(args: Dict, flow_manager: FlowManager):
flow_manager.state["age"] = args["age"]
await flow_manager.set_node("marital_status", create_marital_status_node())


async def handle_marital_status_collection(args: Dict, flow_manager: FlowManager):
flow_manager.state["marital_status"] = args["marital_status"]
await flow_manager.set_node(
"quote_calculation",
create_quote_calculation_node(
flow_manager.state["age"],
flow_manager.state["marital_status"]
flow_manager.state["age"], flow_manager.state["marital_status"]
),
)


async def handle_quote_calculation(args: Dict, flow_manager: FlowManager):
quote = await calculate_quote(args)
flow_manager.state["quote"] = quote
await flow_manager.set_node("quote_results", create_quote_results_node(quote))

async def handle_coverage_update(args: Dict, flow_manager: FlowManager):
updated_quote = await update_coverage(args)
flow_manager.state["quote"] = updated_quote
await flow_manager.set_node("quote_results", create_quote_results_node(updated_quote))

async def handle_end_quote(_: Dict, flow_manager: FlowManager):
await flow_manager.set_node("end", create_end_node())

HANDLERS = {
"collect_age": handle_age_collection,
"collect_marital_status": handle_marital_status_collection,
"calculate_quote": handle_quote_calculation,
"update_coverage": handle_coverage_update,
"end_quote": handle_end_quote,
}

async def handle_insurance_transition(function_name: str, args: Dict, flow_manager: FlowManager):
"""Handle transitions between insurance flow states."""
logger.debug(f"Processing {function_name} transition with args: {args}")
await HANDLERS[function_name](args, flow_manager)



async def main():
"""Main function to set up and run the insurance quote bot."""
Expand Down Expand Up @@ -433,7 +417,12 @@ async def main():
llm=llm,
context_aggregator=context_aggregator,
tts=tts,
transition_callback=handle_insurance_transition,
transition_callbacks={
"collect_age": handle_age_collection,
"collect_marital_status": handle_marital_status_collection,
"calculate_quote": handle_quote_calculation,
"end_quote": handle_end_quote,
},
)

@transport.event_handler("on_first_participant_joined")
Expand Down
49 changes: 20 additions & 29 deletions examples/dynamic/insurance_gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""
Insurance Quote Example using Pipecat Dynamic Flows with Google Gemini
"""Insurance Quote Example using Pipecat Dynamic Flows with Google Gemini.

This example demonstrates how to create a conversational insurance quote bot using:
- Dynamic flow management for flexible conversation paths
Expand Down Expand Up @@ -44,11 +43,11 @@
from pipecat.services.google import GoogleLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport

from pipecat_flows import FlowArgs, FlowManager, FlowResult, NodeConfig

sys.path.append(str(Path(__file__).parent.parent))
from runner import configure

from pipecat_flows import FlowArgs, FlowManager, FlowResult, NodeConfig

load_dotenv(override=True)

logger.remove(0)
Expand Down Expand Up @@ -92,14 +91,14 @@ async def collect_age(args: FlowArgs) -> AgeCollectionResult:
"""Process age collection."""
age = args["age"]
logger.debug(f"collect_age handler executing with age: {age}")
return {"age": age}
return AgeCollectionResult(age=age)


async def collect_marital_status(args: FlowArgs) -> MaritalStatusResult:
"""Process marital status collection."""
status = args["marital_status"]
logger.debug(f"collect_marital_status handler executing with status: {status}")
return {"marital_status": status}
return MaritalStatusResult(marital_status=status)


async def calculate_quote(args: FlowArgs) -> QuoteCalculationResult:
Expand Down Expand Up @@ -271,9 +270,11 @@ def create_quote_results_node(
f"Monthly Premium: ${quote['monthly_premium']:.2f}\n"
f"Coverage Amount: ${quote['coverage_amount']:,}\n"
f"Deductible: ${quote['deductible']:,}\n\n"
"Explain these quote details to the customer. "
"Ask if they would like to adjust the coverage amount or deductible. "
"They can also end the quote process if they're satisfied."
"Explain these quote details to the customer. When they request changes, "
"use update_coverage to recalculate their quote. Explain how their "
"changes affected the premium and compare it to their previous quote. "
"Ask if they'd like to make any other adjustments or if they're ready "
"to end the quote process."
),
}
],
Expand All @@ -283,7 +284,7 @@ def create_quote_results_node(
{
"name": "update_coverage",
"handler": update_coverage,
"description": "Update coverage options when customer requests changes",
"description": "Recalculate quote with new coverage options",
"parameters": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -331,41 +332,26 @@ async def handle_age_collection(args: Dict, flow_manager: FlowManager):
flow_manager.state["age"] = args["age"]
await flow_manager.set_node("marital_status", create_marital_status_node())


async def handle_marital_status_collection(args: Dict, flow_manager: FlowManager):
flow_manager.state["marital_status"] = args["marital_status"]
await flow_manager.set_node(
"quote_calculation",
create_quote_calculation_node(
flow_manager.state["age"],
flow_manager.state["marital_status"]
flow_manager.state["age"], flow_manager.state["marital_status"]
),
)


async def handle_quote_calculation(args: Dict, flow_manager: FlowManager):
quote = await calculate_quote(args)
flow_manager.state["quote"] = quote
await flow_manager.set_node("quote_results", create_quote_results_node(quote))

async def handle_coverage_update(args: Dict, flow_manager: FlowManager):
updated_quote = await update_coverage(args)
flow_manager.state["quote"] = updated_quote
await flow_manager.set_node("quote_results", create_quote_results_node(updated_quote))

async def handle_end_quote(_: Dict, flow_manager: FlowManager):
await flow_manager.set_node("end", create_end_node())

HANDLERS = {
"collect_age": handle_age_collection,
"collect_marital_status": handle_marital_status_collection,
"calculate_quote": handle_quote_calculation,
"update_coverage": handle_coverage_update,
"end_quote": handle_end_quote,
}

async def handle_insurance_transition(function_name: str, args: Dict, flow_manager: FlowManager):
"""Handle transitions between insurance flow states."""
logger.debug(f"Processing {function_name} transition with args: {args}")
await HANDLERS[function_name](args, flow_manager)

async def main():
"""Main function to set up and run the insurance quote bot."""
Expand Down Expand Up @@ -413,7 +399,12 @@ async def main():
llm=llm,
context_aggregator=context_aggregator,
tts=tts,
transition_callback=handle_insurance_transition,
transition_callbacks={
"collect_age": handle_age_collection,
"collect_marital_status": handle_marital_status_collection,
"calculate_quote": handle_quote_calculation,
"end_quote": handle_end_quote,
},
)

@transport.event_handler("on_first_participant_joined")
Expand Down
Loading
Loading