-
There are ways to combine the channel values by annotating it with an append only reducer function, Or a replacing reducer if it's the case. But my case is to let a node with two or more precedent nodes know what its immediate inputs should be. I tried the replacing way, and the input contains only one of the precedent nodes' output, i.e. the node lastly attached to the current one. I also tried the append only way, but I cannot figure out how to discard the result of older nodes. Here is my code def add(a: list[str], b: list[str] | str) -> list[str]:
if isinstance(b, str):
return a + [b]
return a + b
def replace(a: list[str], b: list[str] | str) -> list[str]:
if isinstance(b, str):
return [b]
return b
class Message(TypedDict):
message: Annotated[list[str], replace]
def answer_node1(state: Message):
print("---Answer Node 1---")
print(state.get("a"))
print(state.get("message"))
return {"message": "node1"}
def answer_node2(message: Message):
print("---Answer Node 2---")
print(message.get("message"))
return {"message": "node2"}
def answer_node3(message: Message):
print("---Answer Node 3---")
print(message.get("message"))
return {"message": "node3"}
def answer_node4(message: Message):
print("---Answer Node 4---")
print(message.get("message"))
return {"message": "node4"}
graph = StateGraph(state_schema=Message)
graph.add_node(answer_node1)
graph.add_node(answer_node2)
graph.add_node(answer_node3)
graph.add_node(answer_node4)
graph.add_edge(START, "answer_node1")
graph.add_edge("answer_node1", "answer_node2")
graph.add_edge("answer_node1", "answer_node3")
graph.add_edge("answer_node2", "answer_node4")
graph.add_edge("answer_node3", "answer_node4")
graph.add_edge("answer_node4", END)
graph = graph.compile() |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 4 replies
-
@cyberelf i think you could just keep some sort of IDs associated with each message (i.e. which node the message came from) and then use only the ones that you need for each downstream node? |
Beta Was this translation helpful? Give feedback.
-
@cyberelf actually, you can do this: from typing import TypedDict, Annotated
from langgraph.graph.state import StateGraph, START, END
from langgraph.channels.topic import Topic
class Message(TypedDict):
message: Annotated[str, Topic(str, accumulate=False)]
def answer_node1(state: Message):
print("---Answer Node 1---")
print(state.get("message"))
return {"message": "node1"}
def answer_node2(message: Message):
print("---Answer Node 2---")
print(message.get("message"))
return {"message": "node2"}
def answer_node3(message: Message):
print("---Answer Node 3---")
print(message.get("message"))
return {"message": "node3"}
def answer_node4(message: Message):
print("---Answer Node 4---")
print(message.get("message"))
return {"message": "node4"}
graph = StateGraph(state_schema=Message)
graph.add_node(answer_node1)
graph.add_node(answer_node2)
graph.add_node(answer_node3)
graph.add_node(answer_node4)
graph.add_edge(START, "answer_node1")
graph.add_edge("answer_node1", "answer_node2")
graph.add_edge("answer_node1", "answer_node3")
graph.add_edge("answer_node2", "answer_node4")
graph.add_edge("answer_node3", "answer_node4")
graph.add_edge("answer_node4", END)
graph = graph.compile()
graph.invoke({"message": "hi"}) let me know if this addresses your question! |
Beta Was this translation helpful? Give feedback.
@cyberelf actually, you can do this: