跳到主要内容
Open on GitHub

流式传输

流式传输对于增强构建在 LLM 之上的应用程序的响应性至关重要。通过逐步显示输出,即使在完整响应准备好之前,流式传输也能显着改善用户体验 (UX),尤其是在处理 LLM 的延迟时。

概述

LLM 生成完整响应通常会产生几秒钟的延迟,这在具有多个模型调用的复杂应用程序中变得更加明显。幸运的是,LLM 迭代地生成响应,从而允许在生成中间结果时显示它们。通过流式传输这些中间输出,LangChain 可以在 LLM 驱动的应用程序中实现更流畅的 UX,并在其核心设计中提供对流式传输的内置支持。

在本指南中,我们将讨论 LLM 应用程序中的流式传输,并探讨 LangChain 的流式传输 API 如何促进应用程序中各种组件的实时输出。

在 LLM 应用程序中流式传输什么

在涉及 LLM 的应用程序中,可以流式传输几种类型的数据,以通过减少感知延迟和提高透明度来改善用户体验。这些包括

1. 流式传输 LLM 输出

最常见和最关键的流式传输数据是 LLM 本身生成的输出。LLM 通常需要时间来生成完整响应,通过实时流式传输输出,用户可以在生成部分结果时看到它们。这提供了即时反馈,并有助于减少用户的等待时间。

2. 流式传输管道或工作流程进度

除了流式传输 LLM 输出之外,流式传输更复杂的工作流程或管道的进度也很有用,这让用户了解应用程序的整体进度。这可能包括

  • 在 LangGraph 工作流程中: 使用 LangGraph,工作流程由代表各种步骤的节点和边组成。此处的流式传输涉及跟踪 图状态 的更改,因为各个 节点 请求更新。这允许更精细地监控工作流程中当前处于活动状态的节点,从而提供有关工作流程在不同阶段进行时状态的实时更新。

  • 在 LCEL 管道中:LCEL 管道流式传输更新涉及捕获来自各个 子 runnable 的进度。例如,随着管道的不同步骤或组件执行,您可以流式传输当前正在运行的子 runnable,从而提供对整个管道进度的实时洞察。

流式传输管道或工作流程进度对于向用户清晰地展示应用程序在执行过程中的位置至关重要。

3. 流式传输自定义数据

在某些情况下,您可能需要流式传输 自定义数据,这些数据超出了管道或工作流程结构提供的信息。此自定义信息注入到工作流程中的特定步骤中,无论该步骤是工具还是 LangGraph 节点。例如,您可以流式传输有关工具实时执行的操作或 LangGraph 节点进度的更新。这种细粒度数据直接从步骤内部发出,可更详细地了解工作流程的执行情况,并且在需要更高可见性的复杂流程中尤其有用。

流式传输 API

LangChain 有两个主要的 API 用于实时流式传输输出。任何实现 Runnable 接口 的组件都支持这些 API,包括 LLM编译的 LangGraph 图 以及使用 LCEL 生成的任何 Runnable。

  1. 同步 stream 和异步 astream:用于在生成时流式传输来自各个 Runnables(例如,聊天模型)的输出,或流式传输使用 LangGraph 创建的任何工作流程。
  2. 仅异步 astream_events:使用此 API 可以访问完全使用 LCEL 构建的 LLM 应用程序中的自定义事件和中间输出。请注意,此 API 可用,但在使用 LangGraph 时不需要。
注意

此外,还有一个旧版异步 astream_log API。不建议在新项目中使用此 API,因为它比其他流式传输 API 更复杂且功能更少。

stream()astream()

stream() 方法返回一个迭代器,该迭代器同步生成输出块,因为它们是生成的。您可以使用 for 循环实时处理每个块。例如,当使用 LLM 时,这允许在生成输出时以增量方式流式传输输出,从而减少用户的等待时间。

stream()astream() 方法生成的块的类型取决于正在流式传输的组件。例如,当从 LLM 流式传输时,每个组件都将是一个 AIMessageChunk;但是,对于其他组件,块可能会有所不同。

stream() 方法返回一个迭代器,该迭代器在生成这些块时生成它们。例如,

for chunk in component.stream(some_input):
# IMPORTANT: Keep the processing of each chunk as efficient as possible.
# While you're processing the current chunk, the upstream component is
# waiting to produce the next one. For example, if working with LangGraph,
# graph execution is paused while the current chunk is being processed.
# In extreme cases, this could even result in timeouts (e.g., when llm outputs are
# streamed from an API that has a timeout).
print(chunk)

异步版本 astream() 的工作方式类似,但专为非阻塞工作流程而设计。您可以在异步代码中使用它来实现相同的实时流式传输行为。

与聊天模型一起使用

当将 stream()astream() 与聊天模型一起使用时,输出将作为 AIMessageChunk 流式传输,因为它是由 LLM 生成的。这允许您在 LLM 的输出生成时以增量方式呈现或处理它,这在交互式应用程序或界面中特别有用。

与 LangGraph 一起使用

LangGraph 编译的图是 Runnables,并支持标准流式传输 API。

当将 streamastream 方法与 LangGraph 一起使用时,您可以选择 一个或多个 流式传输模式,这些模式允许您控制流式传输的输出类型。可用的流式传输模式为

  • “values”:为每个步骤发出 状态 的所有值。
  • “updates”:仅在每个步骤之后发出节点名称和节点返回的更新。
  • “debug”:为每个步骤发出调试事件。
  • “messages”逐令牌 发出 LLM 消息
  • “custom”:发出使用 LangGraph 的 StreamWriter 编写的自定义输出。

有关更多信息,请参阅

与 LCEL 一起使用

如果您使用 LangChain 的表达式语言 (LCEL) 组合多个 Runnables,则 stream()astream() 方法将按照惯例流式传输链中最后一步的输出。这允许以增量方式流式传输最终处理结果。LCEL 尝试优化管道中的流式传输延迟,以便尽快获得最后一步的流式传输结果。

astream_events

提示

使用 astream_events API 可以访问完全使用 LCEL 构建的 LLM 应用程序中的自定义数据和中间输出。

虽然此 API 也可用于 LangGraph,但在使用 LangGraph 时通常没有必要,因为 streamastream 方法为 LangGraph 图提供了全面的流式传输功能。

对于使用 LCEL 构建的链,.stream() 方法仅流式传输链中最后一步的输出。这对于某些应用程序可能已足够,但随着您构建更复杂的由多个 LLM 调用组成的链,您可能希望在最终输出旁边使用链的中间值。例如,在构建基于文档的聊天应用程序时,您可能希望返回来源以及最终生成结果。

有一些方法可以 使用回调 来做到这一点,或者通过构造您的链,使其使用类似链接的 .assign() 调用将中间值传递到末尾,但 LangChain 还包含一个 .astream_events() 方法,该方法结合了回调的灵活性和 .stream() 的人体工程学。调用时,它返回一个迭代器,该迭代器生成 各种类型的事件,您可以根据项目的需要对其进行筛选和处理。

这是一个小示例,仅打印包含流式传输的聊天模型输出的事件

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-sonnet-20240229")

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for event in chain.astream_events({"topic": "parrot"}):
kind = event["event"]
if kind == "on_chat_model_stream":
print(event, end="|", flush=True)

您可以大致将其视为回调事件的迭代器(尽管格式不同) - 您可以在几乎所有 LangChain 组件上使用它!

有关如何使用 .astream_events() 的更多详细信息,包括列出可用事件的表格,请参阅 本指南

将自定义数据写入流

要将自定义数据写入流,您需要根据您正在使用的组件选择以下方法之一

  1. LangGraph 的 StreamWriter 可用于写入自定义数据,这些数据将在使用 LangGraph 时通过 streamastream API 显示。重要提示 这是一个 LangGraph 功能,因此在使用纯 LCEL 时不可用。有关更多信息,请参阅 如何流式传输自定义数据
  2. dispatch_events / adispatch_events 可用于写入自定义数据,这些数据将通过 astream_events API 显示。有关更多信息,请参阅 如何分派自定义回调事件

“自动流式传输”聊天模型

LangChain 通过在某些情况下自动启用流式传输模式,简化了从 聊天模型 进行流式传输的过程,即使您没有显式调用流式传输方法也是如此。当您使用非流式传输 invoke 方法但仍想流式传输整个应用程序(包括聊天模型的中间结果)时,这尤其有用。

工作原理

当您在聊天模型上调用 invoke(或 ainvoke)方法时,如果 LangChain 检测到您正在尝试流式传输整个应用程序,它将自动切换到流式传输模式。

在幕后,它将让 invoke(或 ainvoke)使用 stream(或 astream)方法来生成其输出。调用的结果与使用 invoke 的代码有关;但是,在流式传输聊天模型时,LangChain 将负责在 LangChain 的 回调系统 中调用 on_llm_new_token 事件。这些回调事件允许 LangGraph stream/astreamastream_events 实时显示聊天模型的输出。

示例

def node(state):
...
# The code below uses the invoke method, but LangChain will
# automatically switch to streaming mode
# when it detects that the overall
# application is being streamed.
ai_message = model.invoke(state["messages"])
...

for chunk in compiled_graph.stream(..., mode="messages"):
...

异步编程

LangChain 提供许多方法的同步(sync)和异步(async)版本。异步方法通常以“a”为前缀(例如,ainvokeastream)。在编写异步代码时,始终如一地使用这些异步方法以确保非阻塞行为和最佳性能至关重要。

如果流式传输数据未能实时显示,请确保您为工作流程使用了正确的异步方法。

请查看 LangChain 中的异步编程指南,以获取有关使用 LangChain 编写异步代码的更多信息。

请参阅以下操作指南,以获取 LangChain 中流式传输的具体示例

有关将自定义数据写入流的信息,请参阅以下资源


此页是否对您有帮助?