跳到主要内容
Open on GitHub

流式传输

流式传输对于增强基于大型语言模型(LLM)的应用程序的响应能力至关重要。通过渐进式显示输出,甚至在完整响应就绪之前,流式传输显著改善了用户体验(UX),尤其是在处理大型语言模型延迟时。

概述

大型语言模型(LLM)生成完整响应通常会产生几秒钟的延迟,在涉及多次模型调用的复杂应用程序中,这种延迟会更加明显。幸运的是,大型语言模型以迭代方式生成响应,允许在生成过程中显示中间结果。通过流式传输这些中间输出,LangChain 可以在基于大型语言模型的应用程序中实现更流畅的用户体验,并在其核心设计中提供了对流式传输的内置支持。

在本指南中,我们将讨论大型语言模型应用程序中的流式传输,并探讨 LangChain 的流式传输 API 如何促进应用程序中各种组件的实时输出。

在大型语言模型应用程序中应该流式传输什么

在涉及大型语言模型的应用程序中,可以通过流式传输多种类型的数据来减少感知延迟并提高透明度,从而改善用户体验。这些包括:

1. 流式传输大型语言模型输出

最常见和最关键的流式传输数据是大型语言模型自身生成的输出。大型语言模型通常需要时间才能生成完整响应,通过实时流式传输输出,用户可以在生成过程中看到部分结果。这提供了即时反馈,并有助于减少用户的等待时间。

2. 流式传输管道或工作流进度

除了仅仅流式传输大型语言模型输出之外,流式传输通过更复杂工作流或管道的进度也很有用,让用户了解应用程序的整体进展情况。这可能包括:

  • 在 LangGraph 工作流中:使用LangGraph,工作流由表示各种步骤的节点和边组成。这里的流式传输涉及跟踪图状态的变化,因为单个节点会请求更新。这允许对工作流中当前活动的节点进行更细粒度的监控,从而在工作流进展到不同阶段时提供实时状态更新。

  • 在 LCEL 管道中:LCEL管道流式传输更新涉及捕获单个子可运行组件的进度。例如,当管道的不同步骤或组件执行时,您可以流式传输当前正在运行的子可运行组件,提供对整个管道进度的实时洞察。

流式传输管道或工作流进度对于向用户提供应用程序在执行过程中所处位置的清晰视图至关重要。

3. 流式传输自定义数据

在某些情况下,您可能需要流式传输自定义数据,这些数据超出了管道或工作流结构所提供的信息。这种自定义信息被注入到工作流中的特定步骤中,无论该步骤是工具还是 LangGraph 节点。例如,您可以实时流式传输关于工具正在做什么的更新,或者通过 LangGraph 节点的进度。这种粒度数据直接从步骤内部发出,为工作流的执行提供了更详细的洞察,在需要更高可见性的复杂过程中尤其有用。

流式传输 API

LangChain 提供了两个主要的 API 用于实时流式传输输出。这些 API 受任何实现可运行接口的组件支持,包括大型语言模型(LLM)已编译的 LangGraph 图,以及任何使用LCEL生成的可运行组件。

  1. 同步 stream 和异步 astream:用于在生成时流式传输单个可运行组件(例如,聊天模型)的输出,或者流式传输任何使用 LangGraph 创建的工作流。
  2. 仅限异步的 astream_events:使用此 API 可以访问完全使用LCEL构建的大型语言模型应用程序中的自定义事件和中间输出。请注意,此 API 可用,但在使用 LangGraph 时并非必需。
注意

此外,还有一个遗留的异步 astream_log API。不推荐在新项目中使用此 API,因为它比其他流式传输 API 更复杂且功能较少。

stream()astream()

stream() 方法返回一个迭代器,该迭代器以同步方式在输出生成时逐块生成输出。您可以使用 for 循环实时处理每个块。例如,在使用大型语言模型时,这允许输出在生成时增量流式传输,从而减少用户的等待时间。

stream()astream() 方法生成的块类型取决于正在进行流式传输的组件。例如,当从大型语言模型流式传输时,每个组件都将是AIMessageChunk;但是,对于其他组件,块可能不同。

stream() 方法返回一个迭代器,该迭代器在这些块生成时逐个生成它们。例如,

for chunk in component.stream(some_input):
# IMPORTANT: Keep the processing of each chunk as efficient as possible.
# While you're processing the current chunk, the upstream component is
# waiting to produce the next one. For example, if working with LangGraph,
# graph execution is paused while the current chunk is being processed.
# In extreme cases, this could even result in timeouts (e.g., when llm outputs are
# streamed from an API that has a timeout).
print(chunk)

异步版本,即 astream(),工作方式类似,但专为非阻塞工作流设计。您可以在异步代码中使用它来实现相同的实时流式传输行为。

与聊天模型一起使用

当与聊天模型一起使用 stream()astream() 时,输出会以AIMessageChunk的形式流式传输,因为它们是由大型语言模型生成的。这允许您在大型语言模型输出生成时逐步呈现或处理它们,这在交互式应用程序或界面中特别有用。

与 LangGraph 一起使用

LangGraph 编译的图是可运行组件,并支持标准的流式传输 API。

当与 LangGraph 一起使用 streamastream 方法时,您可以选择一种或多种流式传输模式,它们允许您控制流式传输的输出类型。可用的流式传输模式有:

  • "values":为每个步骤发出状态的所有值。
  • "updates":仅发出每个步骤后由节点返回的节点名称和更新。
  • "debug":为每个步骤发出调试事件。
  • "messages"逐词发出大型语言模型消息
  • "custom":发出使用LangGraph 的 StreamWriter 写入的自定义输出。

欲了解更多信息,请参阅

与 LCEL 一起使用

如果您使用LangChain 表达式语言 (LCEL)组合多个可运行组件,则 stream()astream() 方法通常会流式传输链中最后一步的输出。这允许最终处理结果以增量方式流式传输。LCEL 尝试优化管道中的流式传输延迟,以便尽快获得最后一步的流式传输结果。

astream_events

提示

使用 astream_events API 访问完全使用LCEL构建的大型语言模型应用程序中的自定义数据和中间输出。

尽管此 API 也可与LangGraph一起使用,但在使用 LangGraph 时通常不需要它,因为 streamastream 方法为 LangGraph 图提供了全面的流式传输功能。

对于使用LCEL构建的链,.stream() 方法仅流式传输链中最后一步的输出。这对于某些应用程序可能已足够,但随着您构建包含多个大型语言模型调用的更复杂链,您可能希望将链的中间值与最终输出一起使用。例如,在构建基于文档的聊天应用程序时,您可能希望在最终生成的同时返回源信息。

可以使用回调来完成此操作,或者通过构建您的链,使其能够通过类似链式.assign()调用的方式将中间值传递到末尾,但 LangChain 还包含一个 .astream_events() 方法,它结合了回调的灵活性和 .stream() 的人体工程学优势。调用时,它会返回一个迭代器,该迭代器会生成各种类型的事件,您可以根据项目需求对其进行过滤和处理。

这是一个小例子,它只打印包含流式聊天模型输出的事件

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-sonnet-20240229")

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for event in chain.astream_events({"topic": "parrot"}):
kind = event["event"]
if kind == "on_chat_model_stream":
print(event, end="|", flush=True)

您可以大致将其视为一个回调事件的迭代器(尽管格式有所不同)——并且您可以在几乎所有 LangChain 组件上使用它!

请参阅此指南,了解有关如何使用 .astream_events() 的更详细信息,包括列出可用事件的表格。

将自定义数据写入流

要将自定义数据写入流,您需要根据正在使用的组件选择以下方法之一:

  1. LangGraph 的StreamWriter 可用于写入自定义数据,这些数据在使用 LangGraph 时将通过streamastream API 显示。重要提示:这是 LangGraph 的一个功能,因此在纯 LCEL 环境中不可用。参阅如何流式传输自定义数据了解更多信息。
  2. dispatch_events / adispatch_events 可用于写入自定义数据,这些数据将通过astream_events API 显示。参阅如何分派自定义回调事件了解更多信息。

“自动流式传输”聊天模型

LangChain 通过在某些情况下自动启用流式传输模式来简化聊天模型的流式传输,即使您没有显式调用流式传输方法。这在您使用非流式传输的 invoke 方法但仍希望流式传输整个应用程序(包括聊天模型的中间结果)时特别有用。

工作原理

当您在聊天模型上调用 invoke(或 ainvoke)方法时,如果 LangChain 检测到您正在尝试流式传输整个应用程序,它将自动切换到流式传输模式。

在底层,它将使 invoke(或 ainvoke)使用 stream(或 astream)方法来生成其输出。就使用 invoke 的代码而言,调用的结果将是相同的;然而,当聊天模型进行流式传输时,LangChain 将负责在 LangChain 的回调系统中调用 on_llm_new_token 事件。这些回调事件允许 LangGraph 的 stream/astreamastream_events 实时显示聊天模型的输出。

示例

def node(state):
...
# The code below uses the invoke method, but LangChain will
# automatically switch to streaming mode
# when it detects that the overall
# application is being streamed.
ai_message = model.invoke(state["messages"])
...

for chunk in compiled_graph.stream(..., mode="messages"):
...

异步编程

LangChain 提供了许多方法的同步(sync)和异步(async)版本。异步方法通常以字母“a”作为前缀(例如,ainvokeastream)。在编写异步代码时,持续使用这些异步方法对于确保非阻塞行为和最佳性能至关重要。

如果流式传输数据未能实时显示,请确保您的工作流使用了正确的异步方法。

请参阅LangChain 中的异步编程指南,了解如何使用 LangChain 编写异步代码的更多信息。

请参阅以下操作指南,了解 LangChain 中流式传输的具体示例:

要将自定义数据写入流,请参阅以下资源: