跳转到主要内容

长期记忆代理

本教程展示了如何使用 LangGraph 实现具有长期记忆能力的代理。该代理可以存储、检索和使用记忆来增强其与用户的交互。

MemGPT 等论文的启发,并从我们自己关于长期记忆的工作中提炼而来,该图从聊天交互中提取记忆并将它们持久化到数据库中。本教程中的“记忆”将以两种方式表示

  • 由代理生成的文本信息
  • (主体, 谓词, 客体) 知识三元组形式表示的代理提取的关于实体的结构化信息。

此信息稍后可以被读取或语义查询,以便在您的机器人响应特定用户时提供个性化上下文。

核心思想是,通过保存记忆,代理可以持久化关于用户的跨多个对话(线程)共享的信息,这与 LangGraph 的持久化功能所实现的单个对话的记忆不同。

memory_graph.png

你还可以在这个仓库中查看此代理的完整实现。

安装依赖

%pip install -U --quiet langgraph langchain-openai langchain-community tiktoken
import getpass
import os


def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("OPENAI_API_KEY")
_set_env("TAVILY_API_KEY")
OPENAI_API_KEY:  ········
TAVILY_API_KEY: ········
import json
from typing import List, Literal, Optional

import tiktoken
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.messages import get_buffer_string
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import ChatOpenAI
from langchain_openai.embeddings import OpenAIEmbeddings
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.prebuilt import ToolNode

定义记忆的向量存储

首先,让我们定义将要存储记忆的向量存储。记忆将以嵌入形式存储,稍后将基于对话上下文查找。我们将使用内存中的向量存储。

recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())

定义工具

接下来,让我们定义我们的记忆工具。我们需要一个工具来存储记忆,另一个工具来搜索记忆以找到最相关的记忆。

import uuid


def get_user_id(config: RunnableConfig) -> str:
user_id = config["configurable"].get("user_id")
if user_id is None:
raise ValueError("User ID needs to be provided to save a memory.")

return user_id


@tool
def save_recall_memory(memory: str, config: RunnableConfig) -> str:
"""Save memory to vectorstore for later semantic retrieval."""
user_id = get_user_id(config)
document = Document(
page_content=memory, id=str(uuid.uuid4()), metadata={"user_id": user_id}
)
recall_vector_store.add_documents([document])
return memory


@tool
def search_recall_memories(query: str, config: RunnableConfig) -> List[str]:
"""Search for relevant memories."""
user_id = get_user_id(config)

def _filter_function(doc: Document) -> bool:
return doc.metadata.get("user_id") == user_id

documents = recall_vector_store.similarity_search(
query, k=3, filter=_filter_function
)
return [document.page_content for document in documents]

此外,让我们为我们的代理提供使用 Tavily 搜索网络的能力。

search = TavilySearchResults(max_results=1)
tools = [save_recall_memory, search_recall_memories, search]

定义状态、节点和边

我们的图状态将仅包含两个通道—— messages 用于跟踪聊天历史记录,以及 recall_memories —— 在第一次调用代理之前拉入并传递给代理系统提示的上下文记忆。

class State(MessagesState):
# add memories that will be retrieved based on the conversation context
recall_memories: List[str]
# Define the prompt template for the agent
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful assistant with advanced long-term memory"
" capabilities. Powered by a stateless LLM, you must rely on"
" external memory to store information between conversations."
" Utilize the available memory tools to store and retrieve"
" important details that will help you better attend to the user's"
" needs and understand their context.\n\n"
"Memory Usage Guidelines:\n"
"1. Actively use memory tools (save_core_memory, save_recall_memory)"
" to build a comprehensive understanding of the user.\n"
"2. Make informed suppositions and extrapolations based on stored"
" memories.\n"
"3. Regularly reflect on past interactions to identify patterns and"
" preferences.\n"
"4. Update your mental model of the user with each new piece of"
" information.\n"
"5. Cross-reference new information with existing memories for"
" consistency.\n"
"6. Prioritize storing emotional context and personal values"
" alongside facts.\n"
"7. Use memory to anticipate needs and tailor responses to the"
" user's style.\n"
"8. Recognize and acknowledge changes in the user's situation or"
" perspectives over time.\n"
"9. Leverage memories to provide personalized examples and"
" analogies.\n"
"10. Recall past challenges or successes to inform current"
" problem-solving.\n\n"
"## Recall Memories\n"
"Recall memories are contextually retrieved based on the current"
" conversation:\n{recall_memories}\n\n"
"## Instructions\n"
"Engage with the user naturally, as a trusted colleague or friend."
" There's no need to explicitly mention your memory capabilities."
" Instead, seamlessly incorporate your understanding of the user"
" into your responses. Be attentive to subtle cues and underlying"
" emotions. Adapt your communication style to match the user's"
" preferences and current emotional state. Use tools to persist"
" information you want to retain in the next conversation. If you"
" do call tools, all text preceding the tool call is an internal"
" message. Respond AFTER calling the tool, once you have"
" confirmation that the tool completed successfully.\n\n",
),
("placeholder", "{messages}"),
]
)
model = ChatOpenAI(model_name="gpt-4o")
model_with_tools = model.bind_tools(tools)

tokenizer = tiktoken.encoding_for_model("gpt-4o")


def agent(state: State) -> State:
"""Process the current state and generate a response using the LLM.

Args:
state (schemas.State): The current state of the conversation.

Returns:
schemas.State: The updated state with the agent's response.
"""
bound = prompt | model_with_tools
recall_str = (
"<recall_memory>\n" + "\n".join(state["recall_memories"]) + "\n</recall_memory>"
)
prediction = bound.invoke(
{
"messages": state["messages"],
"recall_memories": recall_str,
}
)
return {
"messages": [prediction],
}


def load_memories(state: State, config: RunnableConfig) -> State:
"""Load memories for the current conversation.

Args:
state (schemas.State): The current state of the conversation.
config (RunnableConfig): The runtime configuration for the agent.

Returns:
State: The updated state with loaded memories.
"""
convo_str = get_buffer_string(state["messages"])
convo_str = tokenizer.decode(tokenizer.encode(convo_str)[:2048])
recall_memories = search_recall_memories.invoke(convo_str, config)
return {
"recall_memories": recall_memories,
}


def route_tools(state: State):
"""Determine whether to use tools or end the conversation based on the last message.

Args:
state (schemas.State): The current state of the conversation.

Returns:
Literal["tools", "__end__"]: The next step in the graph.
"""
msg = state["messages"][-1]
if msg.tool_calls:
return "tools"

return END

构建图

我们的代理图将与简单的 ReAct 代理非常相似。唯一重要的修改是添加一个节点,以便在第一次调用代理之前加载记忆。

# Create the graph and add nodes
builder = StateGraph(State)
builder.add_node(load_memories)
builder.add_node(agent)
builder.add_node("tools", ToolNode(tools))

# Add edges to the graph
builder.add_edge(START, "load_memories")
builder.add_edge("load_memories", "agent")
builder.add_conditional_edges("agent", route_tools, ["tools", END])
builder.add_edge("tools", "agent")

# Compile the graph
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

运行代理!

让我们第一次运行代理,并告诉它一些关于用户的信息!

def pretty_print_stream_chunk(chunk):
for node, updates in chunk.items():
print(f"Update from node: {node}")
if "messages" in updates:
updates["messages"][-1].pretty_print()
else:
print(updates)

print("\n")
# NOTE: we're specifying `user_id` to save memories for a given user
config = {"configurable": {"user_id": "1", "thread_id": "1"}}

for chunk in graph.stream({"messages": [("user", "my name is John")]}, config=config):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': []}


Update from node: agent
================================== Ai Message ==================================
Tool Calls:
save_recall_memory (call_OqfbWodmrywjMnB1v3p19QLt)
Call ID: call_OqfbWodmrywjMnB1v3p19QLt
Args:
memory: User's name is John.


Update from node: tools
================================= Tool Message =================================
Name: save_recall_memory

User's name is John.


Update from node: agent
================================== Ai Message ==================================

Nice to meet you, John! How can I assist you today?

你可以看到代理保存了关于用户姓名的记忆。让我们添加更多关于用户的信息!

for chunk in graph.stream({"messages": [("user", "i love pizza")]}, config=config):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ["User's name is John."]}


Update from node: agent
================================== Ai Message ==================================
Tool Calls:
save_recall_memory (call_xxEivMuWCURJrGxMZb02Eh31)
Call ID: call_xxEivMuWCURJrGxMZb02Eh31
Args:
memory: John loves pizza.


Update from node: tools
================================= Tool Message =================================
Name: save_recall_memory

John loves pizza.


Update from node: agent
================================== Ai Message ==================================

Pizza is amazing! Do you have a favorite type or topping?
for chunk in graph.stream(
{"messages": [("user", "yes -- pepperoni!")]},
config={"configurable": {"user_id": "1", "thread_id": "1"}},
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ["User's name is John.", 'John loves pizza.']}


Update from node: agent
================================== Ai Message ==================================
Tool Calls:
save_recall_memory (call_AFrtCVwIEr48Fim80zlhe6xg)
Call ID: call_AFrtCVwIEr48Fim80zlhe6xg
Args:
memory: John's favorite pizza topping is pepperoni.


Update from node: tools
================================= Tool Message =================================
Name: save_recall_memory

John's favorite pizza topping is pepperoni.


Update from node: agent
================================== Ai Message ==================================

Pepperoni is a classic choice! Do you have a favorite pizza place, or do you enjoy making it at home?
for chunk in graph.stream(
{"messages": [("user", "i also just moved to new york")]},
config={"configurable": {"user_id": "1", "thread_id": "1"}},
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ["User's name is John.", 'John loves pizza.', "John's favorite pizza topping is pepperoni."]}


Update from node: agent
================================== Ai Message ==================================
Tool Calls:
save_recall_memory (call_Na86uY9eBzaJ0sS0GM4Z9tSf)
Call ID: call_Na86uY9eBzaJ0sS0GM4Z9tSf
Args:
memory: John just moved to New York.


Update from node: tools
================================= Tool Message =================================
Name: save_recall_memory

John just moved to New York.


Update from node: agent
================================== Ai Message ==================================

Welcome to New York! That's a fantastic place for a pizza lover. Have you had a chance to explore any of the famous pizzerias there yet?

现在,我们可以在不同的线程中使用已保存的关于用户的信息。让我们试试看

config = {"configurable": {"user_id": "1", "thread_id": "2"}}

for chunk in graph.stream(
{"messages": [("user", "where should i go for dinner?")]}, config=config
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ['John loves pizza.', "User's name is John.", 'John just moved to New York.']}


Update from node: agent
================================== Ai Message ==================================

Considering you just moved to New York and love pizza, I'd recommend checking out some of the iconic pizza places in the city. Some popular spots include:

1. **Di Fara Pizza** in Brooklyn – Known for its classic New York-style pizza.
2. **Joe's Pizza** in Greenwich Village – A historic pizzeria with a great reputation.
3. **Lucali** in Carroll Gardens, Brooklyn – Often ranked among the best for its delicious thin-crust pies.

Would you like more recommendations or information about any of these places?

请注意,代理在回答之前是如何加载最相关的记忆的,在我们的示例中,它根据食物偏好和位置提出了晚餐建议。

最后,让我们将搜索工具与其余的对话上下文和记忆一起使用,以查找披萨店的位置

for chunk in graph.stream(
{"messages": [("user", "what's the address for joe's in greenwich village?")]},
config=config,
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ['John loves pizza.', 'John just moved to New York.', "John's favorite pizza topping is pepperoni."]}


Update from node: agent
================================== Ai Message ==================================
Tool Calls:
tavily_search_results_json (call_aespiB28jpTFvaC4d0qpfY6t)
Call ID: call_aespiB28jpTFvaC4d0qpfY6t
Args:
query: Joe's Pizza Greenwich Village NYC address


Update from node: tools
================================= Tool Message =================================
Name: tavily_search_results_json

[{"url": "https://www.joespizzanyc.com/locations-1-1", "content": "Joe's Pizza Greenwich Village (Original Location) 7 Carmine Street New York, NY 10014 (212) 366-1182 Joe's Pizza Times Square 1435 Broadway New York, NY 10018 (646) 559-4878. TIMES SQUARE MENU. ORDER JOE'S TIMES SQUARE Joe's Pizza Williamsburg 216 Bedford Avenue Brooklyn, NY 11249"}]


Update from node: agent
================================== Ai Message ==================================

The address for Joe's Pizza in Greenwich Village is:

**7 Carmine Street, New York, NY 10014**

Enjoy your pizza!

如果你传递不同的用户 ID,代理的响应将不会个性化,因为我们没有保存关于其他用户的任何信息

添加结构化记忆

到目前为止,我们将记忆表示为字符串,例如 "John 喜欢披萨"。这是将记忆持久化到向量存储时的自然表示。如果你的用例可以从其他持久化后端(例如图形数据库)中受益,我们可以更新我们的应用程序以生成具有附加结构的记忆。

下面,我们更新 save_recall_memory 工具以接受“知识三元组”列表,或包含 subjectpredicateobject 的 3 元组,适合存储在知识图中。然后,我们的模型将生成这些表示,作为其工具调用的一部分。

为简单起见,我们使用与之前相同的向量数据库,但 save_recall_memorysearch_recall_memories 工具可以进一步更新,以与图形数据库交互。现在,我们只需要更新 save_recall_memory 工具

recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())
from typing_extensions import TypedDict


class KnowledgeTriple(TypedDict):
subject: str
predicate: str
object_: str


@tool
def save_recall_memory(memories: List[KnowledgeTriple], config: RunnableConfig) -> str:
"""Save memory to vectorstore for later semantic retrieval."""
user_id = get_user_id(config)
for memory in memories:
serialized = " ".join(memory.values())
document = Document(
serialized,
id=str(uuid.uuid4()),
metadata={
"user_id": user_id,
**memory,
},
)
recall_vector_store.add_documents([document])
return memories

然后,我们可以像以前一样编译该图

tools = [save_recall_memory, search_recall_memories, search]
model_with_tools = model.bind_tools(tools)


# Create the graph and add nodes
builder = StateGraph(State)
builder.add_node(load_memories)
builder.add_node(agent)
builder.add_node("tools", ToolNode(tools))

# Add edges to the graph
builder.add_edge(START, "load_memories")
builder.add_edge("load_memories", "agent")
builder.add_conditional_edges("agent", route_tools, ["tools", END])
builder.add_edge("tools", "agent")

# Compile the graph
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"user_id": "3", "thread_id": "1"}}

for chunk in graph.stream({"messages": [("user", "Hi, I'm Alice.")]}, config=config):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': []}


Update from node: agent
================================== Ai Message ==================================

Hello, Alice! How can I assist you today?

请注意,该应用程序选择从用户的语句中提取知识三元组

for chunk in graph.stream(
{"messages": [("user", "My friend John likes Pizza.")]}, config=config
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': []}


Update from node: agent
================================== Ai Message ==================================
Tool Calls:
save_recall_memory (call_EQSZlvZLZpPa0OGS5Kyzy2Yz)
Call ID: call_EQSZlvZLZpPa0OGS5Kyzy2Yz
Args:
memories: [{'subject': 'Alice', 'predicate': 'has a friend', 'object_': 'John'}, {'subject': 'John', 'predicate': 'likes', 'object_': 'Pizza'}]


Update from node: tools
================================= Tool Message =================================
Name: save_recall_memory

[{"subject": "Alice", "predicate": "has a friend", "object_": "John"}, {"subject": "John", "predicate": "likes", "object_": "Pizza"}]


Update from node: agent
================================== Ai Message ==================================

Got it! If you need any suggestions related to pizza or anything else, feel free to ask. What else is on your mind today?

和以前一样,从一个线程生成的记忆可以在来自同一用户的另一个线程中访问

config = {"configurable": {"user_id": "3", "thread_id": "2"}}

for chunk in graph.stream(
{"messages": [("user", "What food should I bring to John's party?")]}, config=config
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ['John likes Pizza', 'Alice has a friend John']}


Update from node: agent
================================== Ai Message ==================================

Since John likes pizza, bringing some delicious pizza would be a great choice for the party. You might also consider asking if there are any specific toppings he prefers or if there are any dietary restrictions among the guests. This way, you can ensure everyone enjoys the food!

可选地,为了说明目的,我们可以可视化模型提取的知识图

%pip install -U --quiet matplotlib networkx
import matplotlib.pyplot as plt
import networkx as nx

# Fetch records
records = recall_vector_store.similarity_search(
"Alice", k=2, filter=lambda doc: doc.metadata["user_id"] == "3"
)


# Plot graph
plt.figure(figsize=(6, 4), dpi=80)
G = nx.DiGraph()

for record in records:
G.add_edge(
record.metadata["subject"],
record.metadata["object_"],
label=record.metadata["predicate"],
)

pos = nx.spring_layout(G)
nx.draw(
G,
pos,
with_labels=True,
node_size=3000,
node_color="lightblue",
font_size=10,
font_weight="bold",
arrows=True,
)
edge_labels = nx.get_edge_attributes(G, "label")
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_color="red")
plt.show()


此页是否对您有帮助?