跳到主要内容
Open In ColabOpen on GitHub

如何从RAG应用程序流式传输结果

本指南解释了如何从RAG应用程序中流式传输结果。它涵盖了从最终输出中流式传输令牌,以及链的中间步骤(例如,从查询重写中)。

我们将基于Lilian Weng在《LLM驱动的自主智能体》博客文章中,并在RAG教程中构建的带来源的问答应用来展开。

设置

依赖项

我们将使用以下包

%pip install --upgrade --quiet  langchain langchain-community langchainhub beautifulsoup4

LangSmith

你用LangChain构建的许多应用程序将包含多个步骤,其中有多次LLM调用。随着这些应用程序变得越来越复杂,能够检查链或代理内部发生了什么变得至关重要。做到这一点的最佳方法是使用LangSmith

请注意,LangSmith并非必需,但它会很有帮助。如果您确实想使用LangSmith,在通过上述链接注册后,请务必设置您的环境变量以开始记录跟踪信息

os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = getpass.getpass()

组件

我们需要从LangChain的集成套件中选择三个组件。

一个聊天模型

pip install -qU "langchain[google-genai]"
import getpass
import os

if not os.environ.get("GOOGLE_API_KEY"):
os.environ["GOOGLE_API_KEY"] = getpass.getpass("Enter API key for Google Gemini: ")

from langchain.chat_models import init_chat_model

llm = init_chat_model("gemini-2.0-flash", model_provider="google_genai")

一个嵌入模型

pip install -qU langchain-openai
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

和一个向量存储

pip install -qU langchain-core
from langchain_core.vectorstores import InMemoryVectorStore

vector_store = InMemoryVectorStore(embeddings)

RAG应用程序

让我们重建在Lilian Weng的《LLM驱动的自主智能体》博客文章中,并在RAG教程中构建的带来源的问答应用。

首先,我们索引文档

import bs4
from langchain import hub
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from typing_extensions import List, TypedDict

# Load and chunk contents of the blog
loader = WebBaseLoader(
web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
bs_kwargs=dict(
parse_only=bs4.SoupStrainer(
class_=("post-content", "post-title", "post-header")
)
),
)
docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
all_splits = text_splitter.split_documents(docs)
# Index chunks
_ = vector_store.add_documents(documents=all_splits)

接下来我们构建应用程序

from langchain import hub
from langchain_core.documents import Document
from langgraph.graph import START, StateGraph
from typing_extensions import List, TypedDict

# Define prompt for question-answering
prompt = hub.pull("rlm/rag-prompt")


# Define state for application
class State(TypedDict):
question: str
context: List[Document]
answer: str


# Define application steps
def retrieve(state: State):
retrieved_docs = vector_store.similarity_search(state["question"])
return {"context": retrieved_docs}


def generate(state: State):
docs_content = "\n\n".join(doc.page_content for doc in state["context"])
messages = prompt.invoke({"question": state["question"], "context": docs_content})
response = llm.invoke(messages)
return {"answer": response.content}


# Compile application and test
graph_builder = StateGraph(State).add_sequence([retrieve, generate])
graph_builder.add_edge(START, "retrieve")
graph = graph_builder.compile()
API参考:hub | Document | StateGraph
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

流式传输最终输出

LangGraph支持多种流式传输模式,可以通过指定stream_mode参数来控制。将stream_mode="messages"设置为允许我们从聊天模型调用中流式传输令牌。

通常,一个应用程序中可以有多个聊天模型调用(尽管这里只有一个)。下面,我们使用相应节点的名称,仅筛选出最后一步。

input_message = "What is Task Decomposition?"

for message, metadata in graph.stream(
{"question": "What is Task Decomposition?"},
stream_mode="messages",
):
if metadata["langgraph_node"] == "generate":
print(message.content, end="|")
|Task| De|composition| is| a| technique| used| to| break| down| complex| tasks| into| smaller|,| more| manageable| steps|.| It| often| involves| prompting| models| to| "|think| step| by| step|,"| allowing| for| clearer| reasoning| and| better| performance| on| intricate| problems|.| This| can| be| achieved| through| various| methods|,| including| simple| prompts|,| task|-specific| instructions|,| or| human| input|.||

流式传输中间步骤

其他流式传输模式通常会从我们的调用中流式传输步骤——即,来自各个节点的状态更新。在这种情况下,每个节点只是向状态追加一个新键。

for step in graph.stream(
{"question": "What is Task Decomposition?"},
stream_mode="updates",
):
print(f"{step}\n\n----------------\n")
{'retrieve': {'context': [Document(id='5bf5e308-6ccb-4f09-94d2-d0c36b8c9980', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}, page_content='Fig. 1. Overview of a LLM-powered autonomous agent system.\nComponent One: Planning#\nA complicated task usually involves many steps. An agent needs to know what they are and plan ahead.\nTask Decomposition#\nChain of thought (CoT; Wei et al. 2022) has become a standard prompting technique for enhancing model performance on complex tasks. The model is instructed to “think step by step” to utilize more test-time computation to decompose hard tasks into smaller and simpler steps. CoT transforms big tasks into multiple manageable tasks and shed lights into an interpretation of the model’s thinking process.'), Document(id='d8aed221-7943-414d-8ed7-63c2b0e7523b', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}, page_content='Tree of Thoughts (Yao et al. 2023) extends CoT by exploring multiple reasoning possibilities at each step. It first decomposes the problem into multiple thought steps and generates multiple thoughts per step, creating a tree structure. The search process can be BFS (breadth-first search) or DFS (depth-first search) with each state evaluated by a classifier (via a prompt) or majority vote.\nTask decomposition can be done (1) by LLM with simple prompting like "Steps for XYZ.\\n1.", "What are the subgoals for achieving XYZ?", (2) by using task-specific instructions; e.g. "Write a story outline." for writing a novel, or (3) with human inputs.'), Document(id='bfa87007-02ef-4f81-a008-4522ecea1025', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}, page_content='Resources:\n1. Internet access for searches and information gathering.\n2. Long Term memory management.\n3. GPT-3.5 powered Agents for delegation of simple tasks.\n4. File output.\n\nPerformance Evaluation:\n1. Continuously review and analyze your actions to ensure you are performing to the best of your abilities.\n2. Constructively self-criticize your big-picture behavior constantly.\n3. Reflect on past decisions and strategies to refine your approach.\n4. Every command has a cost, so be smart and efficient. Aim to complete tasks in the least number of steps.'), Document(id='6aff7fc0-5c21-4986-9f1e-91e89715d934', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}, page_content="(3) Task execution: Expert models execute on the specific tasks and log results.\nInstruction:\n\nWith the input and the inference results, the AI assistant needs to describe the process and results. The previous stages can be formed as - User Input: {{ User Input }}, Task Planning: {{ Tasks }}, Model Selection: {{ Model Assignment }}, Task Execution: {{ Predictions }}. You must first answer the user's request in a straightforward manner. Then describe the task process and show your analysis and model inference results to the user in the first person. If inference results contain a file path, must tell the user the complete file path.")]}}

----------------

{'generate': {'answer': 'Task Decomposition is the process of breaking down a complex task into smaller, manageable steps to enhance understanding and execution. Techniques like Chain of Thought (CoT) and Tree of Thoughts (ToT) guide models to think through steps systematically, allowing for better problem-solving. It can be achieved through simple prompting, task-specific instructions, or human input.'}}

----------------

有关LangGraph流式传输的更多信息,请查看其流式传输文档。有关流式传输单个LangChain Runnables的更多信息,请参阅本指南