一个长期记忆代理
本教程展示了如何使用 LangGraph 实现一个具有长期记忆能力的代理。该代理可以存储、检索和使用记忆,以增强其与用户的交互。
受 MemGPT 等论文的启发,并借鉴我们自己在长期记忆方面的研究,该图从聊天交互中提取记忆并将其持久化到数据库中。本教程中的“记忆”将以两种方式呈现
- 代理生成的一段文本信息
- 代理以
(subject, predicate, object)
知识三元组形式提取的关于实体的结构化信息。
这些信息稍后可以通过语义方式读取或查询,以便在您的机器人响应特定用户时提供个性化的上下文。
关键思想是,通过保存记忆,代理持久化了跨多个对话(线程)共享的用户信息,这与 LangGraph 的 持久化 功能已支持的单次对话记忆不同。
您还可以在 此存储库 中查看此代理的完整实现。
安装依赖
%pip install -U --quiet langgraph langchain-openai langchain-tavily tiktoken
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
_set_env("TAVILY_API_KEY")
OPENAI_API_KEY: ········
TAVILY_API_KEY: ········
import json
from typing import List, Literal, Optional
import tiktoken
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.messages import get_buffer_string
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import ChatOpenAI
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain_tavily import TavilySearch
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.prebuilt import ToolNode
定义记忆的向量存储
首先,让我们定义用于存储记忆的向量存储。记忆将以嵌入的形式存储,之后可以根据对话上下文进行查找。我们将使用一个内存中的向量存储。
recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())
定义工具
接下来,让我们定义我们的记忆工具。我们将需要一个工具来存储记忆,另一个工具来搜索它们以找到最相关的记忆。
import uuid
def get_user_id(config: RunnableConfig) -> str:
user_id = config["configurable"].get("user_id")
if user_id is None:
raise ValueError("User ID needs to be provided to save a memory.")
return user_id
@tool
def save_recall_memory(memory: str, config: RunnableConfig) -> str:
"""Save memory to vectorstore for later semantic retrieval."""
user_id = get_user_id(config)
document = Document(
page_content=memory, id=str(uuid.uuid4()), metadata={"user_id": user_id}
)
recall_vector_store.add_documents([document])
return memory
@tool
def search_recall_memories(query: str, config: RunnableConfig) -> List[str]:
"""Search for relevant memories."""
user_id = get_user_id(config)
def _filter_function(doc: Document) -> bool:
return doc.metadata.get("user_id") == user_id
documents = recall_vector_store.similarity_search(
query, k=3, filter=_filter_function
)
return [document.page_content for document in documents]
此外,让我们让代理能够使用 Tavily 搜索网页。
search = TavilySearch(max_results=1)
tools = [save_recall_memory, search_recall_memories, search]
定义状态、节点和边
我们的图状态将只包含两个通道——messages
用于跟踪聊天历史,以及 recall_memories
——在调用代理之前引入并传递给代理系统提示的上下文记忆。
class State(MessagesState):
# add memories that will be retrieved based on the conversation context
recall_memories: List[str]
# Define the prompt template for the agent
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful assistant with advanced long-term memory"
" capabilities. Powered by a stateless LLM, you must rely on"
" external memory to store information between conversations."
" Utilize the available memory tools to store and retrieve"
" important details that will help you better attend to the user's"
" needs and understand their context.\n\n"
"Memory Usage Guidelines:\n"
"1. Actively use memory tools (save_core_memory, save_recall_memory)"
" to build a comprehensive understanding of the user.\n"
"2. Make informed suppositions and extrapolations based on stored"
" memories.\n"
"3. Regularly reflect on past interactions to identify patterns and"
" preferences.\n"
"4. Update your mental model of the user with each new piece of"
" information.\n"
"5. Cross-reference new information with existing memories for"
" consistency.\n"
"6. Prioritize storing emotional context and personal values"
" alongside facts.\n"
"7. Use memory to anticipate needs and tailor responses to the"
" user's style.\n"
"8. Recognize and acknowledge changes in the user's situation or"
" perspectives over time.\n"
"9. Leverage memories to provide personalized examples and"
" analogies.\n"
"10. Recall past challenges or successes to inform current"
" problem-solving.\n\n"
"## Recall Memories\n"
"Recall memories are contextually retrieved based on the current"
" conversation:\n{recall_memories}\n\n"
"## Instructions\n"
"Engage with the user naturally, as a trusted colleague or friend."
" There's no need to explicitly mention your memory capabilities."
" Instead, seamlessly incorporate your understanding of the user"
" into your responses. Be attentive to subtle cues and underlying"
" emotions. Adapt your communication style to match the user's"
" preferences and current emotional state. Use tools to persist"
" information you want to retain in the next conversation. If you"
" do call tools, all text preceding the tool call is an internal"
" message. Respond AFTER calling the tool, once you have"
" confirmation that the tool completed successfully.\n\n",
),
("placeholder", "{messages}"),
]
)
model = ChatOpenAI(model_name="gpt-4o")
model_with_tools = model.bind_tools(tools)
tokenizer = tiktoken.encoding_for_model("gpt-4o")
def agent(state: State) -> State:
"""Process the current state and generate a response using the LLM.
Args:
state (schemas.State): The current state of the conversation.
Returns:
schemas.State: The updated state with the agent's response.
"""
bound = prompt | model_with_tools
recall_str = (
"<recall_memory>\n" + "\n".join(state["recall_memories"]) + "\n</recall_memory>"
)
prediction = bound.invoke(
{
"messages": state["messages"],
"recall_memories": recall_str,
}
)
return {
"messages": [prediction],
}
def load_memories(state: State, config: RunnableConfig) -> State:
"""Load memories for the current conversation.
Args:
state (schemas.State): The current state of the conversation.
config (RunnableConfig): The runtime configuration for the agent.
Returns:
State: The updated state with loaded memories.
"""
convo_str = get_buffer_string(state["messages"])
convo_str = tokenizer.decode(tokenizer.encode(convo_str)[:2048])
recall_memories = search_recall_memories.invoke(convo_str, config)
return {
"recall_memories": recall_memories,
}
def route_tools(state: State):
"""Determine whether to use tools or end the conversation based on the last message.
Args:
state (schemas.State): The current state of the conversation.
Returns:
Literal["tools", "__end__"]: The next step in the graph.
"""
msg = state["messages"][-1]
if msg.tool_calls:
return "tools"
return END
构建图
我们的代理图将与简单的 ReAct 代理 非常相似。唯一重要的修改是在首次调用代理之前添加一个节点来加载记忆。
# Create the graph and add nodes
builder = StateGraph(State)
builder.add_node(load_memories)
builder.add_node(agent)
builder.add_node("tools", ToolNode(tools))
# Add edges to the graph
builder.add_edge(START, "load_memories")
builder.add_edge("load_memories", "agent")
builder.add_conditional_edges("agent", route_tools, ["tools", END])
builder.add_edge("tools", "agent")
# Compile the graph
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
运行代理!
让我们第一次运行代理,并告诉它一些关于用户的信息!
def pretty_print_stream_chunk(chunk):
for node, updates in chunk.items():
print(f"Update from node: {node}")
if "messages" in updates:
updates["messages"][-1].pretty_print()
else:
print(updates)
print("\n")
# NOTE: we're specifying `user_id` to save memories for a given user
config = {"configurable": {"user_id": "1", "thread_id": "1"}}
for chunk in graph.stream({"messages": [("user", "my name is John")]}, config=config):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': []}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
save_recall_memory (call_OqfbWodmrywjMnB1v3p19QLt)
Call ID: call_OqfbWodmrywjMnB1v3p19QLt
Args:
memory: User's name is John.
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: save_recall_memory
User's name is John.
Update from node: agent
==================================[1m Ai Message [0m==================================
Nice to meet you, John! How can I assist you today?
您可以看到代理保存了关于用户姓名的记忆。让我们再添加一些关于用户的信息!
for chunk in graph.stream({"messages": [("user", "i love pizza")]}, config=config):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ["User's name is John."]}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
save_recall_memory (call_xxEivMuWCURJrGxMZb02Eh31)
Call ID: call_xxEivMuWCURJrGxMZb02Eh31
Args:
memory: John loves pizza.
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: save_recall_memory
John loves pizza.
Update from node: agent
==================================[1m Ai Message [0m==================================
Pizza is amazing! Do you have a favorite type or topping?
for chunk in graph.stream(
{"messages": [("user", "yes -- pepperoni!")]},
config={"configurable": {"user_id": "1", "thread_id": "1"}},
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ["User's name is John.", 'John loves pizza.']}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
save_recall_memory (call_AFrtCVwIEr48Fim80zlhe6xg)
Call ID: call_AFrtCVwIEr48Fim80zlhe6xg
Args:
memory: John's favorite pizza topping is pepperoni.
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: save_recall_memory
John's favorite pizza topping is pepperoni.
Update from node: agent
==================================[1m Ai Message [0m==================================
Pepperoni is a classic choice! Do you have a favorite pizza place, or do you enjoy making it at home?
for chunk in graph.stream(
{"messages": [("user", "i also just moved to new york")]},
config={"configurable": {"user_id": "1", "thread_id": "1"}},
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ["User's name is John.", 'John loves pizza.', "John's favorite pizza topping is pepperoni."]}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
save_recall_memory (call_Na86uY9eBzaJ0sS0GM4Z9tSf)
Call ID: call_Na86uY9eBzaJ0sS0GM4Z9tSf
Args:
memory: John just moved to New York.
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: save_recall_memory
John just moved to New York.
Update from node: agent
==================================[1m Ai Message [0m==================================
Welcome to New York! That's a fantastic place for a pizza lover. Have you had a chance to explore any of the famous pizzerias there yet?
现在我们可以在不同的线程中使用保存的用户信息。让我们试一试
config = {"configurable": {"user_id": "1", "thread_id": "2"}}
for chunk in graph.stream(
{"messages": [("user", "where should i go for dinner?")]}, config=config
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ['John loves pizza.', "User's name is John.", 'John just moved to New York.']}
Update from node: agent
==================================[1m Ai Message [0m==================================
Considering you just moved to New York and love pizza, I'd recommend checking out some of the iconic pizza places in the city. Some popular spots include:
1. **Di Fara Pizza** in Brooklyn – Known for its classic New York-style pizza.
2. **Joe's Pizza** in Greenwich Village – A historic pizzeria with a great reputation.
3. **Lucali** in Carroll Gardens, Brooklyn – Often ranked among the best for its delicious thin-crust pies.
Would you like more recommendations or information about any of these places?
请注意,代理在回答之前会加载最相关的记忆,在我们的例子中,它根据食物偏好和位置建议了晚餐。
最后,让我们结合其余的对话上下文和记忆,使用搜索工具来查找披萨店的位置
for chunk in graph.stream(
{"messages": [("user", "what's the address for joe's in greenwich village?")]},
config=config,
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ['John loves pizza.', 'John just moved to New York.', "John's favorite pizza topping is pepperoni."]}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
tavily_search_results_json (call_aespiB28jpTFvaC4d0qpfY6t)
Call ID: call_aespiB28jpTFvaC4d0qpfY6t
Args:
query: Joe's Pizza Greenwich Village NYC address
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: tavily_search_results_json
[{"url": "https://www.joespizzanyc.com/locations-1-1", "content": "Joe's Pizza Greenwich Village (Original Location) 7 Carmine Street New York, NY 10014 (212) 366-1182 Joe's Pizza Times Square 1435 Broadway New York, NY 10018 (646) 559-4878. TIMES SQUARE MENU. ORDER JOE'S TIMES SQUARE Joe's Pizza Williamsburg 216 Bedford Avenue Brooklyn, NY 11249"}]
Update from node: agent
==================================[1m Ai Message [0m==================================
The address for Joe's Pizza in Greenwich Village is:
**7 Carmine Street, New York, NY 10014**
Enjoy your pizza!
如果您传递不同的用户ID,代理的响应将不会个性化,因为我们还没有保存其他用户的任何信息。
添加结构化记忆
到目前为止,我们已将记忆表示为字符串,例如“John loves pizza”。这是一种将记忆持久化到向量存储中的自然表示。如果您的用例将受益于其他持久化后端(例如图数据库),我们可以更新我们的应用程序以生成具有额外结构的记忆。
下面,我们更新了 save_recall_memory
工具,使其接受一个“知识三元组”列表,即包含 subject
、predicate
和 object
的三元组,适用于存储在知识图中。我们的模型随后将这些表示作为其工具调用的一部分进行生成。
为简单起见,我们使用与之前相同的向量数据库,但 save_recall_memory
和 search_recall_memories
工具可以进一步更新以与图数据库交互。目前,我们只需要更新 save_recall_memory
工具。
recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())
from typing_extensions import TypedDict
class KnowledgeTriple(TypedDict):
subject: str
predicate: str
object_: str
@tool
def save_recall_memory(memories: List[KnowledgeTriple], config: RunnableConfig) -> str:
"""Save memory to vectorstore for later semantic retrieval."""
user_id = get_user_id(config)
for memory in memories:
serialized = " ".join(memory.values())
document = Document(
serialized,
id=str(uuid.uuid4()),
metadata={
"user_id": user_id,
**memory,
},
)
recall_vector_store.add_documents([document])
return memories
然后我们可以像之前一样编译图
tools = [save_recall_memory, search_recall_memories, search]
model_with_tools = model.bind_tools(tools)
# Create the graph and add nodes
builder = StateGraph(State)
builder.add_node(load_memories)
builder.add_node(agent)
builder.add_node("tools", ToolNode(tools))
# Add edges to the graph
builder.add_edge(START, "load_memories")
builder.add_edge("load_memories", "agent")
builder.add_conditional_edges("agent", route_tools, ["tools", END])
builder.add_edge("tools", "agent")
# Compile the graph
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"user_id": "3", "thread_id": "1"}}
for chunk in graph.stream({"messages": [("user", "Hi, I'm Alice.")]}, config=config):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': []}
Update from node: agent
==================================[1m Ai Message [0m==================================
Hello, Alice! How can I assist you today?
请注意,应用程序选择从用户的陈述中提取知识三元组。
for chunk in graph.stream(
{"messages": [("user", "My friend John likes Pizza.")]}, config=config
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': []}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
save_recall_memory (call_EQSZlvZLZpPa0OGS5Kyzy2Yz)
Call ID: call_EQSZlvZLZpPa0OGS5Kyzy2Yz
Args:
memories: [{'subject': 'Alice', 'predicate': 'has a friend', 'object_': 'John'}, {'subject': 'John', 'predicate': 'likes', 'object_': 'Pizza'}]
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: save_recall_memory
[{"subject": "Alice", "predicate": "has a friend", "object_": "John"}, {"subject": "John", "predicate": "likes", "object_": "Pizza"}]
Update from node: agent
==================================[1m Ai Message [0m==================================
Got it! If you need any suggestions related to pizza or anything else, feel free to ask. What else is on your mind today?
和之前一样,从一个线程生成的记忆可以在同一用户的另一个线程中访问。
config = {"configurable": {"user_id": "3", "thread_id": "2"}}
for chunk in graph.stream(
{"messages": [("user", "What food should I bring to John's party?")]}, config=config
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ['John likes Pizza', 'Alice has a friend John']}
Update from node: agent
==================================[1m Ai Message [0m==================================
Since John likes pizza, bringing some delicious pizza would be a great choice for the party. You might also consider asking if there are any specific toppings he prefers or if there are any dietary restrictions among the guests. This way, you can ensure everyone enjoys the food!
可选地,为了说明目的,我们可以可视化模型提取的知识图。
%pip install -U --quiet matplotlib networkx
import matplotlib.pyplot as plt
import networkx as nx
# Fetch records
records = recall_vector_store.similarity_search(
"Alice", k=2, filter=lambda doc: doc.metadata["user_id"] == "3"
)
# Plot graph
plt.figure(figsize=(6, 4), dpi=80)
G = nx.DiGraph()
for record in records:
G.add_edge(
record.metadata["subject"],
record.metadata["object_"],
label=record.metadata["predicate"],
)
pos = nx.spring_layout(G)
nx.draw(
G,
pos,
with_labels=True,
node_size=3000,
node_color="lightblue",
font_size=10,
font_weight="bold",
arrows=True,
)
edge_labels = nx.get_edge_attributes(G, "label")
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_color="red")
plt.show()