从MapReduceDocumentsChain迁移
MapReduceDocumentsChain 实现了对(可能很长的)文本的 Map-Reduce 策略。其策略如下:
- 将文本拆分为更小的文档;
- 将一个过程映射到较小的文档上;
- 归约或合并这些过程的结果,生成最终结果。
请注意,映射步骤通常在输入文档之间并行执行。
在此上下文中,常见的应用过程是总结,其中映射步骤总结单个文档,归约步骤生成这些总结的总结。
在归约步骤中,`MapReduceDocumentsChain` 支持递归地“折叠”总结:输入将根据令牌限制进行分区,并为每个分区生成总结。此步骤将重复进行,直到总结的总长度在所需限制内,从而允许对任意长度的文本进行总结。这对于上下文窗口较小的模型尤其有用。
LangGraph 支持Map-Reduce 工作流,并为解决此问题带来多项优势:
- LangGraph 允许单独的步骤(例如连续摘要)进行流式传输,从而更好地控制执行;
- LangGraph 的检查点支持错误恢复,通过人机协作工作流进行扩展,并更容易集成到对话式应用程序中。
- LangGraph 的实现更容易扩展,我们将在下文看到。
下面我们将分别介绍 `MapReduceDocumentsChain` 和相应的 LangGraph 实现,首先通过一个简单的示例进行说明,然后通过一个较长的示例文本来演示递归归约步骤。
我们首先加载一个聊天模型
pip install -qU "langchain[google-genai]"
import getpass
import os
if not os.environ.get("GOOGLE_API_KEY"):
os.environ["GOOGLE_API_KEY"] = getpass.getpass("Enter API key for Google Gemini: ")
from langchain.chat_models import init_chat_model
llm = init_chat_model("gemini-2.0-flash", model_provider="google_genai")
基本示例(短文档)
出于说明目的,我们使用以下 3 个文档。
from langchain_core.documents import Document
documents = [
Document(page_content="Apples are red", metadata={"title": "apple_book"}),
Document(page_content="Blueberries are blue", metadata={"title": "blueberry_book"}),
Document(page_content="Bananas are yelow", metadata={"title": "banana_book"}),
]
旧版
详情
下面我们展示了使用 `MapReduceDocumentsChain` 的实现。我们定义了映射和归约步骤的提示模板,为这些步骤实例化了单独的链,最后实例化了 `MapReduceDocumentsChain`。
from langchain.chains import MapReduceDocumentsChain, ReduceDocumentsChain
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain.chains.llm import LLMChain
from langchain_core.prompts import ChatPromptTemplate
from langchain_text_splitters import CharacterTextSplitter
# Map
map_template = "Write a concise summary of the following: {docs}."
map_prompt = ChatPromptTemplate([("human", map_template)])
map_chain = LLMChain(llm=llm, prompt=map_prompt)
# Reduce
reduce_template = """
The following is a set of summaries:
{docs}
Take these and distill it into a final, consolidated summary
of the main themes.
"""
reduce_prompt = ChatPromptTemplate([("human", reduce_template)])
reduce_chain = LLMChain(llm=llm, prompt=reduce_prompt)
# Takes a list of documents, combines them into a single string, and passes this to an LLMChain
combine_documents_chain = StuffDocumentsChain(
llm_chain=reduce_chain, document_variable_name="docs"
)
# Combines and iteratively reduces the mapped documents
reduce_documents_chain = ReduceDocumentsChain(
# This is final chain that is called.
combine_documents_chain=combine_documents_chain,
# If documents exceed context for `StuffDocumentsChain`
collapse_documents_chain=combine_documents_chain,
# The maximum number of tokens to group documents into.
token_max=1000,
)
# Combining documents by mapping a chain over them, then combining results
map_reduce_chain = MapReduceDocumentsChain(
# Map chain
llm_chain=map_chain,
# Reduce chain
reduce_documents_chain=reduce_documents_chain,
# The variable name in the llm_chain to put the documents in
document_variable_name="docs",
# Return the results of the map steps in the output
return_intermediate_steps=False,
)
result = map_reduce_chain.invoke(documents)
print(result["output_text"])
Fruits come in a variety of colors, with apples being red, blueberries being blue, and bananas being yellow.
在 LangSmith 追踪中,我们观察到四次 LLM 调用:一次总结三个输入文档中的每一个,另一次总结这些总结。
LangGraph
下面我们展示了 LangGraph 的实现,使用与上述相同的提示模板。该图包含一个用于生成总结的节点,该节点映射到输入文档列表。然后,该节点流向第二个节点,该节点生成最终总结。
详情
我们需要安装 langgraph
%pip install -qU langgraph
import operator
from typing import Annotated, List, TypedDict
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langgraph.constants import Send
from langgraph.graph import END, START, StateGraph
map_template = "Write a concise summary of the following: {context}."
reduce_template = """
The following is a set of summaries:
{docs}
Take these and distill it into a final, consolidated summary
of the main themes.
"""
map_prompt = ChatPromptTemplate([("human", map_template)])
reduce_prompt = ChatPromptTemplate([("human", reduce_template)])
map_chain = map_prompt | llm | StrOutputParser()
reduce_chain = reduce_prompt | llm | StrOutputParser()
# Graph components: define the components that will make up the graph
# This will be the overall state of the main graph.
# It will contain the input document contents, corresponding
# summaries, and a final summary.
class OverallState(TypedDict):
# Notice here we use the operator.add
# This is because we want combine all the summaries we generate
# from individual nodes back into one list - this is essentially
# the "reduce" part
contents: List[str]
summaries: Annotated[list, operator.add]
final_summary: str
# This will be the state of the node that we will "map" all
# documents to in order to generate summaries
class SummaryState(TypedDict):
content: str
# Here we generate a summary, given a document
async def generate_summary(state: SummaryState):
response = await map_chain.ainvoke(state["content"])
return {"summaries": [response]}
# Here we define the logic to map out over the documents
# We will use this an edge in the graph
def map_summaries(state: OverallState):
# We will return a list of `Send` objects
# Each `Send` object consists of the name of a node in the graph
# as well as the state to send to that node
return [
Send("generate_summary", {"content": content}) for content in state["contents"]
]
# Here we will generate the final summary
async def generate_final_summary(state: OverallState):
response = await reduce_chain.ainvoke(state["summaries"])
return {"final_summary": response}
# Construct the graph: here we put everything together to construct our graph
graph = StateGraph(OverallState)
graph.add_node("generate_summary", generate_summary)
graph.add_node("generate_final_summary", generate_final_summary)
graph.add_conditional_edges(START, map_summaries, ["generate_summary"])
graph.add_edge("generate_summary", "generate_final_summary")
graph.add_edge("generate_final_summary", END)
app = graph.compile()
from IPython.display import Image
Image(app.get_graph().draw_mermaid_png())
请注意,以流模式调用图允许我们监视步骤并在执行期间对其采取行动。
# Call the graph:
async for step in app.astream({"contents": [doc.page_content for doc in documents]}):
print(step)
{'generate_summary': {'summaries': ['Apples are typically red in color.']}}
{'generate_summary': {'summaries': ['Bananas are yellow in color.']}}
{'generate_summary': {'summaries': ['Blueberries are a type of fruit that are blue in color.']}}
{'generate_final_summary': {'final_summary': 'The main themes are the colors of different fruits: apples are red, blueberries are blue, and bananas are yellow.'}}
在 LangSmith 追踪中,我们恢复了与之前相同的四次 LLM 调用。
总结长文档
当文本相对于 LLM 的上下文窗口较长时,Map-Reduce 流程尤其有用。 `MapReduceDocumentsChain` 支持递归地“折叠”总结:输入根据令牌限制进行分区,并为每个分区生成总结。此步骤重复进行,直到总结的总长度在所需限制内,从而允许对任意长度的文本进行总结。
这个“折叠”步骤在 `MapReduceDocumentsChain` 内部实现为一个 `while` 循环。我们可以通过一个较长的文本来演示此步骤,即 Lilian Weng 的一篇名为 LLM Powered Autonomous Agents 的博客文章(如 RAG 教程 和其他文档中所示)。
首先我们加载文章并将其分割成更小的“子文档”
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import CharacterTextSplitter
loader = WebBaseLoader("https://lilianweng.github.io/posts/2023-06-23-agent/")
documents = loader.load()
text_splitter = CharacterTextSplitter.from_tiktoken_encoder(
chunk_size=1000, chunk_overlap=0
)
split_docs = text_splitter.split_documents(documents)
print(f"Generated {len(split_docs)} documents.")
USER_AGENT environment variable not set, consider setting it to identify your requests.
Created a chunk of size 1003, which is longer than the specified 1000
``````output
Generated 14 documents.
传统方式
详情
我们可以像以前一样调用 `MapReduceDocumentsChain`。
result = map_reduce_chain.invoke(split_docs)
print(result["output_text"])
The article discusses the use of Large Language Models (LLMs) to power autonomous agents in various tasks, showcasing their capabilities in problem-solving beyond generating written content. Key components such as planning, memory optimization, and tool use are explored, with proof-of-concept demos like AutoGPT and GPT-Engineer demonstrating the potential of LLM-powered agents. Challenges include limitations in historical information retention and natural language interface reliability, while the potential of LLMs in enhancing reasoning, problem-solving, and planning proficiency for autonomous agents is highlighted. Overall, the article emphasizes the versatility and power of LLMs in creating intelligent agents for tasks like scientific discovery and experiment design.
请考虑上述调用的 LangSmith 追踪。在实例化 `ReduceDocumentsChain` 时,我们将 `token_max` 设置为 1,000 个令牌。这导致总共有 17 次 LLM 调用:
- 其中 14 次调用用于总结文本分割器生成的 14 个子文档。
- 这生成了总计约 1,000 - 2,000 个令牌的总结。由于我们将 `token_max` 设置为 1,000,因此还有两次调用用于总结(或“折叠”)这些总结。
- 最后一次调用是生成两个“折叠后”总结的最终总结。
LangGraph
详情
我们可以扩展 LangGraph 中我们最初的 Map-Reduce 实现,以实现相同的递归折叠步骤。我们进行以下更改:
- 向状态添加 `collapsed_summaries` 键,以存储折叠后的总结;
- 更新最终总结节点,以总结折叠后的总结;
- 添加一个 `collapse_summaries` 节点,该节点根据令牌长度(此处仍为 1,000 个令牌)对文档列表进行分区,并生成每个分区的总结,并将结果存储在 `collapsed_summaries` 中。
我们从 `collapse_summaries` 到自身添加一个条件边以形成循环:如果折叠后的总结总长度超过 `token_max`,我们重新运行该节点。
from typing import Literal
from langchain.chains.combine_documents.reduce import (
acollapse_docs,
split_list_of_docs,
)
def length_function(documents: List[Document]) -> int:
"""Get number of tokens for input contents."""
return sum(llm.get_num_tokens(doc.page_content) for doc in documents)
token_max = 1000
class OverallState(TypedDict):
contents: List[str]
summaries: Annotated[list, operator.add]
collapsed_summaries: List[Document] # add key for collapsed summaries
final_summary: str
# Add node to store summaries for collapsing
def collect_summaries(state: OverallState):
return {
"collapsed_summaries": [Document(summary) for summary in state["summaries"]]
}
# Modify final summary to read off collapsed summaries
async def generate_final_summary(state: OverallState):
response = await reduce_chain.ainvoke(state["collapsed_summaries"])
return {"final_summary": response}
graph = StateGraph(OverallState)
graph.add_node("generate_summary", generate_summary) # same as before
graph.add_node("collect_summaries", collect_summaries)
graph.add_node("generate_final_summary", generate_final_summary)
# Add node to collapse summaries
async def collapse_summaries(state: OverallState):
doc_lists = split_list_of_docs(
state["collapsed_summaries"], length_function, token_max
)
results = []
for doc_list in doc_lists:
results.append(await acollapse_docs(doc_list, reduce_chain.ainvoke))
return {"collapsed_summaries": results}
graph.add_node("collapse_summaries", collapse_summaries)
def should_collapse(
state: OverallState,
) -> Literal["collapse_summaries", "generate_final_summary"]:
num_tokens = length_function(state["collapsed_summaries"])
if num_tokens > token_max:
return "collapse_summaries"
else:
return "generate_final_summary"
graph.add_conditional_edges(START, map_summaries, ["generate_summary"])
graph.add_edge("generate_summary", "collect_summaries")
graph.add_conditional_edges("collect_summaries", should_collapse)
graph.add_conditional_edges("collapse_summaries", should_collapse)
graph.add_edge("generate_final_summary", END)
app = graph.compile()
LangGraph 允许绘制图结构以帮助可视化其功能
from IPython.display import Image
Image(app.get_graph().draw_mermaid_png())
如前所述,我们可以流式传输图以观察其步骤序列。下面,我们将只打印出步骤的名称。
请注意,由于图中存在循环,因此指定执行的递归限制会很有帮助。这类似于ReduceDocumentsChain.token_max,当超过指定限制时,它将引发特定错误。
async for step in app.astream(
{"contents": [doc.page_content for doc in split_docs]},
{"recursion_limit": 10},
):
print(list(step.keys()))
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['collect_summaries']
['collapse_summaries']
['generate_final_summary']
print(step)
{'generate_final_summary': {'final_summary': 'The summaries discuss the use of Large Language Models (LLMs) to power autonomous agents in various tasks such as problem-solving, planning, and tool use. Key components like planning, memory, and task decomposition are highlighted, along with challenges such as inefficient planning and hallucination. Techniques like Algorithm Distillation and Maximum Inner Product Search are explored for optimization, while frameworks like ReAct and Reflexion show improvements in knowledge-intensive tasks. The importance of accurate interpretation of user input and well-structured code for functional autonomy is emphasized, along with the potential of LLMs in prompting, reasoning, and emergent social behavior in simulation environments. Challenges in real-world scenarios and the use of LLMs with expert-designed tools for tasks like organic synthesis and drug discovery are also discussed.'}}
在相应的 LangSmith 追踪中,我们可以看到与之前相同的 17 次 LLM 调用,这次它们被分组到各自的节点下。
下一步
有关使用 LangGraph 进行构建的详细信息,请查看 LangGraph 文档,包括关于 LangGraph 中 Map-Reduce 细节的此指南。
有关更多基于 LLM 的总结策略,请参阅本教程。