Replay Function Creates New Checkpoints Instead of Replaying Existing State #3015
-
Hello, I believe there might be an issue with the "replay" functionality in the LangGraph library, as the behavior doesn't align with the documentation. According to the documentation, replaying a state should "efficiently replay previously executed nodes instead of re-executing them," by leveraging prior checkpoint executions. However, when I attempt to replay a state using a StateSnapshot, the graph appears to re-execute subsequent nodes and creates new checkpoint IDs, which I did not expect. Steps to Reproduce:
Expected Behavior: Actual Behavior: Following is the code: from langchain_ollama import ChatOllama
from langgraph.graph import StateGraph, START, END, MessagesState
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import ToolNode, tools_condition
from pprint import pprint
llm = ChatOllama(
model="llama3.1-tool",
base_url="http://localhost:11434",
temperature=0.8,
max_tokens=1000
)
def multiply(a: int, b: int) -> int:
"""Multiply a and b.
Args:
a: first int
b: second int
"""
return a*b
def add(a: int, b: int) -> int:
"""Adds a and b.
Args:
a: first int
b: second int
"""
return a+b
def divide(a: int, b: int) -> float:
"""Divide a and b.
Args:
a: first int
b: second int
"""
return a/b
tools = [multiply, add, divide]
llm_with_tools = llm.bind_tools(tools)
# System Message
system_message = SystemMessage(content="You are a helpful AI assistant tasked to perform arithmetic operations on a set of inputs.")
# Node
def assistant(state: MessagesState):
return {"messages": [llm_with_tools.invoke([system_message] + state["messages"])]}
# Graph
builder = StateGraph(MessagesState)
# Define Nodes
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
# Add edges
builder.add_edge(START, "assistant")
builder.add_conditional_edges("assistant", tools_condition)
builder.add_edge("tools", "assistant")
# Compile
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
# Thread
thread = {"configurable": {"thread_id":"1"}}
# Input
initial_input = {"messages": "Multiply 2 and 3."}
# Run the Graph
for event in graph.stream(initial_input, thread, stream_mode="values"):
event["messages"][-1].pretty_print()
# Get history
all_states = [s for s in graph.get_state_history(thread)]
# Show history
for state in all_states:
print("ID: ", state.config["configurable"]["checkpoint_id"])
print("Num Messages: ", len(state.values["messages"]), "Next: ", state.next)
print("-" * 80)
# Replay
to_replay = all_states[-2]
for event in graph.stream(None, config=to_replay.config, stream_mode="values"):
event["messages"][-1].pretty_print()
# Show the history
for state in graph.get_state_history(thread):
print("ID: ", state.config["configurable"]["checkpoint_id"])
print("Num Messages: ", len(state.values["messages"]), "Next: ", state.next)
print("-" * 80) History:
Additional Information:
Could you confirm whether this is the intended behavior, or if this might be a bug? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
@vbarda our docs wording is misleading. This is working as designed afaict. If you stream/invoke from a prior checkpoint, it doesn't re-execute from the start up to that checkpoint, but it does execute and save new checkpoints for whatever follows. This is how thread forking works. |
Beta Was this translation helpful? Give feedback.
@vbarda our docs wording is misleading. This is working as designed afaict. If you stream/invoke from a prior checkpoint, it doesn't re-execute from the start up to that checkpoint, but it does execute and save new checkpoints for whatever follows. This is how thread forking works.