跳转到主要内容

流式传输

流式传输对于增强基于LLM构建的应用程序的响应性至关重要。通过逐步显示输出,即使在完整响应准备就绪之前,流式传输也能显著改善用户体验(UX),特别是在处理LLM的延迟时。

概述

LLM生成完整响应通常会产生几秒钟的延迟,这在具有多次模型调用的复杂应用程序中变得更加明显。幸运的是,LLM会迭代生成响应,允许在生成时显示中间结果。通过流式传输这些中间输出,LangChain可以在LLM驱动的应用程序中实现更流畅的UX,并在其设计的核心提供对流式传输的内置支持。

在本指南中,我们将讨论LLM应用程序中的流式传输,并探索LangChain的流式API如何促进应用程序中各种组件的实时输出。

在LLM应用程序中流式传输什么

在涉及LLM的应用程序中,可以流式传输几种类型的数据,以通过减少感知延迟和提高透明度来改善用户体验。 这些包括

1. 流式传输 LLM 输出

最常见和最关键的流式传输数据是LLM本身生成的输出。 LLM通常需要时间来生成完整的响应,通过实时流式传输输出,用户可以在生成时看到部分结果。这提供了即时反馈,并有助于减少用户的等待时间。

2. 流式传输管道或工作流程进度

除了流式传输 LLM 输出之外,流式传输更复杂的工作流程或管道的进度也很有用,让用户了解应用程序的整体进展情况。这可能包括

  • 在 LangGraph 工作流程中: 使用LangGraph,工作流程由表示各种步骤的节点和边组成。 此处的流式传输涉及跟踪各个节点请求更新时图状态的变化。 这允许更细粒度地监视工作流程中当前处于活动状态的节点,提供有关工作流程在不同阶段进行时的实时更新。

  • 在 LCEL 管道中:LCEL管道流式传输更新涉及捕获各个子可运行对象的进度。例如,当管道的不同步骤或组件执行时,您可以流式传输当前正在运行的子可运行对象,从而提供对整个管道进度的实时洞察。

流式传输管道或工作流程进度对于向用户提供应用程序在执行过程中所处位置的清晰画面至关重要。

3. 流式传输自定义数据

在某些情况下,您可能需要流式传输超出管道或工作流程结构所提供信息的自定义数据。 此自定义信息注入到工作流程的特定步骤中,无论该步骤是工具还是 LangGraph 节点。例如,您可以流式传输有关工具实时执行的操作或 LangGraph 节点的进度更新。这种细粒度数据直接从步骤内发出,提供有关工作流程执行的更详细的见解,并且在需要更高可见性的复杂过程中特别有用。

流式传输 API

LangChain 有两个主要的 API 用于实时流式传输输出。 任何实现可运行接口的组件都支持这些 API,包括LLM编译后的 LangGraph 图,以及使用LCEL生成的任何可运行对象。

  1. 同步 stream 和异步 astream:用于在生成时流式传输单个可运行对象(例如,聊天模型)的输出,或者流式传输使用 LangGraph 创建的任何工作流程。
  2. 仅异步 astream_events:使用此 API 可以访问完全使用LCEL构建的 LLM 应用程序的自定义事件和中间输出。 请注意,此 API 可用,但在使用 LangGraph 时不需要。
注意

此外,还有一个旧版异步 astream_log API。 不建议新项目使用此 API,它比其他流式传输 API 更复杂且功能更少。

stream()astream()

stream() 方法返回一个迭代器,该迭代器在生成时同步产生输出块。 您可以使用 for 循环实时处理每个块。 例如,当使用 LLM 时,这允许在生成时以增量方式流式传输输出,从而减少用户的等待时间。

stream()astream() 方法产生的块类型取决于正在流式传输的组件。 例如,当从LLM流式传输时,每个组件都将是一个AIMessageChunk;但是,对于其他组件,该块可能不同。

stream() 方法返回一个迭代器,该迭代器在生成时产生这些块。 例如,

for chunk in component.stream(some_input):
# IMPORTANT: Keep the processing of each chunk as efficient as possible.
# While you're processing the current chunk, the upstream component is
# waiting to produce the next one. For example, if working with LangGraph,
# graph execution is paused while the current chunk is being processed.
# In extreme cases, this could even result in timeouts (e.g., when llm outputs are
# streamed from an API that has a timeout).
print(chunk)

异步版本 astream() 的工作方式类似,但专为非阻塞工作流程而设计。 您可以在异步代码中使用它来实现相同的实时流式传输行为。

与聊天模型一起使用

当将 stream()astream() 与聊天模型一起使用时,输出会以 AIMessageChunks 的形式进行流式传输,因为它是由 LLM 生成的。 这允许您在 LLM 的输出生成时以增量方式呈现或处理它,这在交互式应用程序或界面中特别有用。

与 LangGraph 一起使用

LangGraph 编译的图是可运行对象,并且支持标准的流式传输 API。

当将 *stream* 和 *astream* 方法与 LangGraph 一起使用时,您可以选择一个或多个 流式传输模式,这些模式允许您控制流式传输的输出类型。 可用的流式传输模式为

  • “values”:为每个步骤发出状态的所有值。
  • “updates”:在每个步骤之后仅发出节点名称和节点返回的更新。
  • “debug”:为每个步骤发出调试事件。
  • “messages”:发出 LLM 消息 逐个令牌
  • “custom”:使用 LangGraph 的 StreamWriter 发出自定义输出。

有关更多信息,请参阅

与 LCEL 一起使用

如果您使用LangChain 的表达式语言 (LCEL) 组合多个可运行对象,则按照惯例,stream()astream() 方法将流式传输链中最后一步的输出。这允许以增量方式流式传输最终处理的结果。LCEL 尝试优化管道中的流式传输延迟,以便尽快获得最后一步的流式传输结果。

astream_events

提示

使用 astream_events API 可以访问完全使用LCEL构建的 LLM 应用程序的自定义数据和中间输出。

虽然此 API 也可用于 LangGraph,但在使用 LangGraph 时通常不需要,因为 streamastream 方法为 LangGraph 图提供了全面的流式传输功能。

对于使用 LCEL 构建的链,.stream() 方法仅流式传输链中最后一步的输出。 这对于某些应用程序可能足够了,但当您构建多个 LLM 调用的更复杂链时,您可能希望在最终输出旁边使用链的中间值。 例如,在构建基于文档的聊天应用程序时,您可能希望返回源以及最终生成的内容。

有几种方法可以使用回调来做到这一点,或者通过以某种方式构建您的链,使其使用类似链接的.assign()调用将中间值传递到末尾,但 LangChain 还包含一个 .astream_events() 方法,该方法将回调的灵活性与 .stream() 的人体工程学相结合。 调用时,它会返回一个迭代器,该迭代器会产生各种类型的事件,您可以根据项目的需要对其进行筛选和处理。

下面是一个小示例,它仅打印包含流式传输的聊天模型输出的事件

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-sonnet-20240229")

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for event in chain.astream_events({"topic": "parrot"}, version="v2"):
kind = event["event"]
if kind == "on_chat_model_stream":
print(event, end="|", flush=True)

你可以把它大致理解为一个回调事件的迭代器(尽管格式有所不同)- 并且你几乎可以在所有 LangChain 组件上使用它!

请参阅本指南,了解有关如何使用 .astream_events() 的更详细信息,包括列出可用事件的表格。

向流写入自定义数据

要向流写入自定义数据,你需要根据你正在使用的组件选择以下方法之一

  1. LangGraph 的 StreamWriter 可用于写入将在使用 LangGraph 时通过 streamastream API 显示的自定义数据。 重要提示 这是 LangGraph 的一个特性,因此在使用纯 LCEL 时不可用。 请参阅如何流式传输自定义数据了解更多信息。
  2. dispatch_events / adispatch_events 可用于写入将通过 astream_events API 显示的自定义数据。 请参阅如何调度自定义回调事件了解更多信息。

“自动流式传输”聊天模型

LangChain 通过在某些情况下自动启用流式传输模式来简化聊天模型的流式传输,即使你没有显式调用流式传输方法也是如此。当你使用非流式传输的 invoke 方法但仍然希望流式传输整个应用程序(包括聊天模型的中间结果)时,这特别有用。

工作原理

当你在聊天模型上调用 invoke(或 ainvoke)方法时,如果 LangChain 检测到你正在尝试流式传输整个应用程序,它将自动切换到流式传输模式。

在底层,它将让 invoke(或 ainvoke)使用 stream(或 astream)方法来生成其输出。 就使用 invoke 的代码而言,调用的结果将是相同的; 但是,在聊天模型被流式传输时,LangChain 将负责在 LangChain 的回调系统中调用 on_llm_new_token 事件。 这些回调事件允许 LangGraph stream/astreamastream_events 实时显示聊天模型的输出。

示例

def node(state):
...
# The code below uses the invoke method, but LangChain will
# automatically switch to streaming mode
# when it detects that the overall
# application is being streamed.
ai_message = model.invoke(state["messages"])
...

for chunk in compiled_graph.stream(..., mode="messages"):
...

异步编程

LangChain 提供了许多方法的同步 (sync) 和异步 (async) 版本。异步方法通常以 "a" 为前缀(例如,ainvokeastream)。在编写异步代码时,始终如一地使用这些异步方法至关重要,以确保非阻塞行为和最佳性能。

如果流式传输的数据未能实时显示,请确保你为你的工作流程使用了正确的异步方法。

请查看LangChain 中的异步编程指南,了解有关使用 LangChain 编写异步代码的更多信息。

请参阅以下操作指南,了解 LangChain 中流式传输的具体示例

有关向流写入自定义数据的信息,请参阅以下资源


此页面是否对您有帮助?