跳转到主要内容
Open In ColabOpen on GitHub

如何流式传输可运行对象

先决条件

本指南假定您熟悉以下概念

流式传输对于使基于 LLM 的应用程序对最终用户感觉响应迅速至关重要。

重要的 LangChain 原语,如 聊天模型输出解析器提示检索器代理 实现了 LangChain Runnable 接口

此接口提供两种通用的流式传输内容的方法

  1. 同步 stream 和异步 astream:流式传输的默认实现,用于流式传输来自链的最终输出
  2. 异步 astream_events 和异步 astream_log:这些方法提供了一种流式传输来自链的中间步骤最终输出的方式。

让我们看看这两种方法,并尝试了解如何使用它们。

信息

有关 LangChain 中流式传输技术的更高级概述,请参阅概念指南的此部分

使用 Stream

所有 Runnable 对象都实现了一个名为 stream 的同步方法和一个名为 astream 的异步变体。

这些方法旨在以块的形式流式传输最终输出,并在每个块可用时立即生成它。

只有当程序中的所有步骤都知道如何处理输入流时,流式传输才有可能;即,一次处理一个输入块,并生成相应的输出块。

此处理的复杂性可能会有所不同,从像发出 LLM 生成的令牌这样的简单任务,到更具挑战性的任务,例如在整个 JSON 完成之前流式传输 JSON 结果的各个部分。

开始探索流式传输的最佳位置是 LLM 应用中最重要​​的组件——LLM 本身!

LLM 和聊天模型

大型语言模型及其聊天变体是基于 LLM 的应用中的主要瓶颈。

大型语言模型可能需要几秒钟才能生成对查询的完整响应。这比应用程序对最终用户感觉响应迅速的 ~200-300 毫秒阈值慢得多。

使应用程序感觉更具响应性的关键策略是显示中间进度;即,逐个令牌流式传输模型的输出。

我们将展示使用聊天模型进行流式传输的示例。从下面的选项中选择一个

pip install -qU "langchain[openai]"
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain.chat_models import init_chat_model

model = init_chat_model("gpt-4o-mini", model_provider="openai")

让我们从同步 stream API 开始

chunks = []
for chunk in model.stream("what color is the sky?"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
The| sky| appears| blue| during| the| day|.|

或者,如果您在异步环境中工作,您可以考虑使用异步 astream API

chunks = []
async for chunk in model.astream("what color is the sky?"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
The| sky| appears| blue| during| the| day|.|

让我们检查其中一个块

chunks[0]
AIMessageChunk(content='The', id='run-b36bea64-5511-4d7a-b6a3-a07b3db0c8e7')

我们得到了一些名为 AIMessageChunk 的东西。此块表示 AIMessage 的一部分。

消息块在设计上是累加的——只需将它们加起来即可获得到目前为止的响应状态!

chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4]
AIMessageChunk(content='The sky appears blue during', id='run-b36bea64-5511-4d7a-b6a3-a07b3db0c8e7')

几乎所有 LLM 应用程序都涉及比仅调用语言模型更多的步骤。

让我们使用 LangChain 表达式语言 (LCEL) 构建一个简单的链,它结合了提示、模型和解析器,并验证流式传输是否有效。

我们将使用 StrOutputParser 来解析模型的输出。这是一个简单的解析器,它从 AIMessageChunk 中提取 content 字段,从而为我们提供模型返回的 token

提示

LCEL 是一种声明式方式,通过链接不同的 LangChain 原语来指定“程序”。使用 LCEL 创建的链受益于 streamastream 的自动实现,从而允许流式传输最终输出。实际上,使用 LCEL 创建的链实现了整个标准 Runnable 接口。

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "parrot"}):
print(chunk, end="|", flush=True)
Here|'s| a| joke| about| a| par|rot|:|

A man| goes| to| a| pet| shop| to| buy| a| par|rot|.| The| shop| owner| shows| him| two| stunning| pa|rr|ots| with| beautiful| pl|um|age|.|

"|There|'s| a| talking| par|rot| an|d a| non|-|talking| par|rot|,"| the| owner| says|.| "|The| talking| par|rot| costs| $|100|,| an|d the| non|-|talking| par|rot| is| $|20|."|

The| man| says|,| "|I|'ll| take| the| non|-|talking| par|rot| at| $|20|."|

He| pays| an|d leaves| with| the| par|rot|.| As| he|'s| walking| down| the| street|,| the| par|rot| looks| up| at| him| an|d says|,| "|You| know|,| you| really| are| a| stupi|d man|!"|

The| man| is| stun|ne|d an|d looks| at| the| par|rot| in| dis|bel|ief|.| The| par|rot| continues|,| "|Yes|,| you| got| r|ippe|d off| big| time|!| I| can| talk| just| as| well| as| that| other| par|rot|,| an|d you| only| pai|d $|20| |for| me|!"|

请注意,即使我们在上面的链的末尾使用了 parser,我们仍然获得了流式输出。 parser 单独处理每个流式块。许多 LCEL 原语 也支持这种转换风格的直通流式传输,这在构建应用程序时非常方便。

自定义函数可以设计为返回生成器,这些生成器能够处理流。

某些可运行对象,例如 提示模板聊天模型,无法处理单个块,而是聚合所有先前的步骤。此类可运行对象可能会中断流式传输过程。

注意

LangChain 表达式语言允许您将链的构造与使用它的模式(例如,同步/异步、批量/流式传输等)分开。如果这与您正在构建的内容无关,您还可以依赖标准的命令式编程方法,通过单独调用每个组件上的 invokebatchstream,将结果分配给变量,然后在下游根据需要使用它们。

使用输入流

如果您想从输出中流式传输 JSON,因为它正在生成,该怎么办?

如果您依赖 json.loads 来解析部分 json,则解析将失败,因为部分 json 不是有效的 json。

您可能会完全不知所措,并声称不可能流式传输 JSON。

好吧,事实证明有一种方法可以做到——解析器需要对输入流进行操作,并尝试将部分 json“自动完成”为有效状态。

让我们看看这样的解析器在操作中的情况,以了解这意味着什么。

from langchain_core.output_parsers import JsonOutputParser

chain = (
model | JsonOutputParser()
) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
async for text in chain.astream(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`"
):
print(text, flush=True)
API 参考:JsonOutputParser
{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 67}]}
{'countries': [{'name': 'France', 'population': 67413}]}
{'countries': [{'name': 'France', 'population': 67413000}]}
{'countries': [{'name': 'France', 'population': 67413000}, {}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan'}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125584}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125584000}]}

现在,让我们打破流式传输。我们将使用之前的示例,并在末尾附加一个提取函数,该函数从最终的 JSON 中提取国家名称。

警告

链中任何对最终输入而不是对输入流进行操作的步骤都可能通过 streamastream 中断流式传输功能。

提示

稍后,我们将讨论 astream_events API,该 API 流式传输来自中间步骤的结果。即使链包含仅对最终输入进行操作的步骤,此 API 仍将流式传输来自中间步骤的结果。

from langchain_core.output_parsers import (
JsonOutputParser,
)


# A function that operates on finalized inputs
# rather than on an input_stream
def _extract_country_names(inputs):
"""A function that does not operates on input streams and breaks streaming."""
if not isinstance(inputs, dict):
return ""

if "countries" not in inputs:
return ""

countries = inputs["countries"]

if not isinstance(countries, list):
return ""

country_names = [
country.get("name") for country in countries if isinstance(country, dict)
]
return country_names


chain = model | JsonOutputParser() | _extract_country_names

async for text in chain.astream(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`"
):
print(text, end="|", flush=True)
API 参考:JsonOutputParser
['France', 'Spain', 'Japan']|

生成器函数

让我们使用可以对输入流进行操作的生成器函数来修复流式传输。

提示

生成器函数(使用 yield 的函数)允许编写对输入流进行操作的代码

from langchain_core.output_parsers import JsonOutputParser


async def _extract_country_names_streaming(input_stream):
"""A function that operates on input streams."""
country_names_so_far = set()

async for input in input_stream:
if not isinstance(input, dict):
continue

if "countries" not in input:
continue

countries = input["countries"]

if not isinstance(countries, list):
continue

for country in countries:
name = country.get("name")
if not name:
continue
if name not in country_names_so_far:
yield name
country_names_so_far.add(name)


chain = model | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
print(text, end="|", flush=True)
API 参考:JsonOutputParser
France|Spain|Japan|
注意

由于上面的代码依赖于 JSON 自动完成,您可能会看到国家名称的一部分(例如,SpSpain),这对于提取结果来说不是人们想要的!

我们专注于流式传输概念,而不一定关注链的结果。

非流式传输组件

某些内置组件(如检索器)不提供任何 streaming。如果我们尝试 stream 它们会发生什么? 🤨

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
["harrison worked at kensho", "harrison likes spicy food"],
embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()

chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks
[[Document(page_content='harrison worked at kensho'),
Document(page_content='harrison likes spicy food')]]

Stream 只是生成了该组件的最终结果。

这没问题 🥹!并非所有组件都必须实现流式传输——在某些情况下,流式传输要么是不必要的,要么是困难的,要么只是没有意义。

提示

使用非流式传输组件构建的 LCEL 链,在许多情况下仍然能够流式传输,部分输出的流式传输在链中的最后一个非流式传输步骤之后开始。

retrieval_chain = (
{
"context": retriever.with_config(run_name="Docs"),
"question": RunnablePassthrough(),
}
| prompt
| model
| StrOutputParser()
)
for chunk in retrieval_chain.stream(
"Where did harrison work? " "Write 3 made up sentences about this place."
):
print(chunk, end="|", flush=True)
Base|d on| the| given| context|,| Harrison| worke|d at| K|ens|ho|.|

Here| are| |3| |made| up| sentences| about| this| place|:|

1|.| K|ens|ho| was| a| cutting|-|edge| technology| company| known| for| its| innovative| solutions| in| artificial| intelligence| an|d data| analytics|.|

2|.| The| modern| office| space| at| K|ens|ho| feature|d open| floor| plans|,| collaborative| work|sp|aces|,| an|d a| vib|rant| atmosphere| that| fos|tere|d creativity| an|d team|work|.|

3|.| With| its| prime| location| in| the| heart| of| the| city|,| K|ens|ho| attracte|d top| talent| from| aroun|d the| worl|d,| creating| a| diverse| an|d dynamic| work| environment|.|

现在我们已经了解了 streamastream 的工作原理,让我们冒险进入流式传输事件的世界。 🏞️

使用流事件

事件流式传输是一个 beta API。此 API 可能会根据反馈进行少量更改。

注意

本指南演示了 V2 API,需要 langchain-core >= 0.2。对于与旧版本 LangChain 兼容的 V1 API,请参阅 此处

import langchain_core

langchain_core.__version__

为了使 astream_events API 正常工作

  • 尽可能在整个代码中使用 async(例如,异步工具等)
  • 如果在定义自定义函数/可运行对象时传播回调
  • 每当使用没有 LCEL 的可运行对象时,请确保在 LLM 上调用 .astream() 而不是 .ainvoke 以强制 LLM 流式传输令牌。
  • 如果任何事情没有按预期工作,请告诉我们! :)

事件参考

下面是一个参考表,显示了各种 Runnable 对象可能发出的一些事件。

注意

当流式传输正确实现时,可运行对象的输入将在输入流完全消耗后才可知。这意味着 inputs 通常仅包含在 end 事件中,而不是 start 事件中。

事件名称输入输出
on_chat_model_start[模型名称]{"messages": [[SystemMessage, HumanMessage]]}
on_chat_model_stream[模型名称]AIMessageChunk(content="你好")
on_chat_model_end[模型名称]{"messages": [[SystemMessage, HumanMessage]]}AIMessageChunk(content="你好世界")
on_llm_start[模型名称]{'input': '你好'}
on_llm_stream[模型名称]'你好'
on_llm_end[模型名称]'你好,人类!'
on_chain_startformat_docs
on_chain_streamformat_docs"你好世界!,再见世界!"
on_chain_endformat_docs[Document(...)]"你好世界!,再见世界!"
on_tool_startsome_tool{"x": 1, "y": "2"}
on_tool_endsome_tool{"x": 1, "y": "2"}
on_retriever_start[检索器名称]{"query": "你好"}
on_retriever_end[检索器名称]{"query": "你好"}[Document(...), ..]
on_prompt_start[template_name]{"question": "你好"}
on_prompt_end[template_name]{"question": "你好"}ChatPromptValue(messages: [SystemMessage, ...])

聊天模型

让我们首先看看聊天模型生成的事件。

events = []
async for event in model.astream_events("hello"):
events.append(event)
注意

对于 langchain-core<0.3.37,显式设置 version kwarg(例如,model.astream_events("hello", version="v2"))。

让我们看看一些开始事件和一些结束事件。

events[:3]
[{'event': 'on_chat_model_start',
'data': {'input': 'hello'},
'name': 'ChatAnthropic',
'tags': [],
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-b18d016d-8b9b-49e7-a555-44db498fcf66', usage_metadata={'input_tokens': 8, 'output_tokens': 4, 'total_tokens': 12, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'data': {'chunk': AIMessageChunk(content='Hello! How can', additional_kwargs={}, response_metadata={}, id='run-b18d016d-8b9b-49e7-a555-44db498fcf66')},
'parent_ids': []}]
events[-2:]
[{'event': 'on_chat_model_stream',
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={'stop_reason': 'end_turn', 'stop_sequence': None}, id='run-b18d016d-8b9b-49e7-a555-44db498fcf66', usage_metadata={'input_tokens': 0, 'output_tokens': 12, 'total_tokens': 12, 'input_token_details': {}})},
'parent_ids': []},
{'event': 'on_chat_model_end',
'data': {'output': AIMessageChunk(content='Hello! How can I assist you today?', additional_kwargs={}, response_metadata={'stop_reason': 'end_turn', 'stop_sequence': None}, id='run-b18d016d-8b9b-49e7-a555-44db498fcf66', usage_metadata={'input_tokens': 8, 'output_tokens': 16, 'total_tokens': 24, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})},
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'parent_ids': []}]

让我们重新审视解析流式 JSON 的示例链,以探索流式事件 API。

chain = (
model | JsonOutputParser()
) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models

events = [
event
async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
)
]

如果您查看前几个事件,您会注意到有 3 个不同的开始事件,而不是 2 个开始事件。

这三个开始事件对应于

  1. 链(模型 + 解析器)
  2. 模型
  3. 解析器
events[:3]
[{'event': 'on_chain_start',
'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'},
'name': 'RunnableSequence',
'tags': [],
'run_id': '4765006b-16e2-4b1d-a523-edd9fd64cb92',
'metadata': {}},
{'event': 'on_chat_model_start',
'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}},
'name': 'ChatAnthropic',
'tags': ['seq:step:1'],
'run_id': '0320c234-7b52-4a14-ae4e-5f100949e589',
'metadata': {}},
{'event': 'on_chat_model_stream',
'data': {'chunk': AIMessageChunk(content='{', id='run-0320c234-7b52-4a14-ae4e-5f100949e589')},
'run_id': '0320c234-7b52-4a14-ae4e-5f100949e589',
'name': 'ChatAnthropic',
'tags': ['seq:step:1'],
'metadata': {}}]

您认为如果您查看最后 3 个事件会看到什么?中间呢?

让我们使用此 API 从模型和解析器中输出流事件。我们忽略了开始事件、结束事件和来自链的事件。

num_events = 0

async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
Chat model chunk: ''
Chat model chunk: '{'
Parser chunk: {}
Chat model chunk: '\n "countries'
Chat model chunk: '": [\n '
Parser chunk: {'countries': []}
Chat model chunk: '{\n "'
Parser chunk: {'countries': [{}]}
Chat model chunk: 'name": "France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",\n "'
Chat model chunk: 'population": 67'
Parser chunk: {'countries': [{'name': 'France', 'population': 67}]}
Chat model chunk: '413'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413}]}
Chat model chunk: '000\n },'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}]}
Chat model chunk: '\n {'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {}]}
Chat model chunk: '\n "name":'
...

由于模型和解析器都支持流式传输,我们实时看到了来自这两个组件的流式传输事件!有点酷,不是吗? 🦜

过滤事件

由于此 API 生成如此多的事件,因此能够过滤事件非常有用。

您可以按组件 name、组件 tags 或组件 type 进行过滤。

按名称

chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
{"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
include_names=["my_parser"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_parser_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'my_parser', 'tags': ['seq:step:2'], 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'metadata': {}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': []}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France'}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}, {}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain'}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
...

按类型

chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
{"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
include_types=["chat_model"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_chat_model_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'model', 'tags': ['seq:step:1'], 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c', usage_metadata={'input_tokens': 56, 'output_tokens': 1, 'total_tokens': 57, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n "countries', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='": [\n ', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{\n "', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='name": "France', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='",\n "', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='population": 67', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='413', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='000\n },', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
...

按标签

注意

标签由给定 runnable 的子组件继承。

如果您使用标签进行过滤,请确保这是您想要的。

chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]})

max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
include_tags=["my_chain"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_chain_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'RunnableSequence', 'tags': ['my_chain'], 'run_id': '58d1302e-36ce-4df7-a3cb-47cb73d57e44', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chat_model_start', 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`', additional_kwargs={}, response_metadata={})]]}}, 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-8222e8a1-d978-4f30-87fc-b2dba838774b', usage_metadata={'input_tokens': 56, 'output_tokens': 1, 'total_tokens': 57, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})}, 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_parser_start', 'data': {}, 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'run_id': '75604c84-e1e6-494a-8b2a-950f45d932e8', 'metadata': {}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', additional_kwargs={}, response_metadata={}, id='run-8222e8a1-d978-4f30-87fc-b2dba838774b')}, 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_parser_stream', 'run_id': '75604c84-e1e6-494a-8b2a-950f45d932e8', 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chain_stream', 'run_id': '58d1302e-36ce-4df7-a3cb-47cb73d57e44', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': []}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n "countries', additional_kwargs={}, response_metadata={}, id='run-8222e8a1-d978-4f30-87fc-b2dba838774b')}, 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='": [\n ', additional_kwargs={}, response_metadata={}, id='run-8222e8a1-d978-4f30-87fc-b2dba838774b')}, 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_parser_stream', 'run_id': '75604c84-e1e6-494a-8b2a-950f45d932e8', 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}, 'data': {'chunk': {'countries': []}}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chain_stream', 'run_id': '58d1302e-36ce-4df7-a3cb-47cb73d57e44', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'chunk': {'countries': []}}, 'parent_ids': []}
...

非流式传输组件

还记得某些组件由于不对输入流进行操作而无法很好地流式传输吗?

虽然此类组件在使用 astream 时可能会中断最终输出的流式传输,但 astream_events 仍将从支持流式传输的中间步骤生成流式传输事件!

# Function that does not support streaming.
# It operates on the finalizes inputs rather than
# operating on the input stream.
def _extract_country_names(inputs):
"""A function that does not operates on input streams and breaks streaming."""
if not isinstance(inputs, dict):
return ""

if "countries" not in inputs:
return ""

countries = inputs["countries"]

if not isinstance(countries, list):
return ""

country_names = [
country.get("name") for country in countries if isinstance(country, dict)
]
return country_names


chain = (
model | JsonOutputParser() | _extract_country_names
) # This parser only works with OpenAI right now

正如预期的那样,astream API 无法正常工作,因为 _extract_country_names 不对流进行操作。

async for chunk in chain.astream(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
print(chunk, flush=True)
['France', 'Spain', 'Japan']

现在,让我们确认使用 astream_events,我们仍然可以看到来自模型和解析器的流式传输输出。

num_events = 0

async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
Chat model chunk: ''
Chat model chunk: '{'
Parser chunk: {}
Chat model chunk: '\n "countries'
Chat model chunk: '": [\n '
Parser chunk: {'countries': []}
Chat model chunk: '{\n "'
Parser chunk: {'countries': [{}]}
Chat model chunk: 'name": "France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",\n "'
Chat model chunk: 'population": 67'
Parser chunk: {'countries': [{'name': 'France', 'population': 67}]}
Chat model chunk: '413'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413}]}
Chat model chunk: '000\n },'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}]}
Chat model chunk: '\n {'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {}]}
Chat model chunk: '\n "name":'
Chat model chunk: ' "Spain",'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain'}]}
Chat model chunk: '\n "population":'
Chat model chunk: ' 47'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47}]}
Chat model chunk: '351'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351}]}
...

传播回调

注意

如果您在工具中使用调用可运行对象,则需要将回调传播到可运行对象;否则,将不会生成任何流事件。

注意

当使用 RunnableLambdas@chain 装饰器时,回调会在幕后自动传播。

from langchain_core.runnables import RunnableLambda
from langchain_core.tools import tool


def reverse_word(word: str):
return word[::-1]


reverse_word = RunnableLambda(reverse_word)


@tool
def bad_tool(word: str):
"""Custom tool that doesn't propagate callbacks."""
return reverse_word.invoke(word)


async for event in bad_tool.astream_events("hello"):
print(event)
API 参考:RunnableLambda | tool
{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'bad_tool', 'tags': [], 'run_id': 'ea900472-a8f7-425d-b627-facdef936ee8', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '77b01284-0515-48f4-8d7c-eb27c1882f86', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '77b01284-0515-48f4-8d7c-eb27c1882f86', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'ea900472-a8f7-425d-b627-facdef936ee8', 'name': 'bad_tool', 'tags': [], 'metadata': {}}

这是一个正确传播回调的重新实现。您会注意到,现在我们也从 reverse_word 可运行对象中获取事件。

@tool
def correct_tool(word: str, callbacks):
"""A tool that correctly propagates callbacks."""
return reverse_word.invoke(word, {"callbacks": callbacks})


async for event in correct_tool.astream_events("hello"):
print(event)
{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'correct_tool', 'tags': [], 'run_id': 'd5ea83b9-9278-49cc-9f1d-aa302d671040', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '44dafbf4-2f87-412b-ae0e-9f71713810df', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '44dafbf4-2f87-412b-ae0e-9f71713810df', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'd5ea83b9-9278-49cc-9f1d-aa302d671040', 'name': 'correct_tool', 'tags': [], 'metadata': {}}

如果您从 Runnable Lambdas 或 @chains 中调用可运行对象,则回调将代表您自动传递。

from langchain_core.runnables import RunnableLambda


async def reverse_and_double(word: str):
return await reverse_word.ainvoke(word) * 2


reverse_and_double = RunnableLambda(reverse_and_double)

await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234"):
print(event)
API 参考:RunnableLambda
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '5cf26fc8-840b-4642-98ed-623dda28707a', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '5cf26fc8-840b-4642-98ed-623dda28707a', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_chain_stream', 'data': {'chunk': '43214321'}, 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}

使用 @chain 装饰器

from langchain_core.runnables import chain


@chain
async def reverse_and_double(word: str):
return await reverse_word.ainvoke(word) * 2


await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234"):
print(event)
API 参考:chain
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '64fc99f0-5d7d-442b-b4f5-4537129f67d1', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '64fc99f0-5d7d-442b-b4f5-4537129f67d1', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_chain_stream', 'data': {'chunk': '43214321'}, 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}

下一步

现在您已经学习了一些使用 LangChain 流式传输最终输出和内部步骤的方法。

要了解更多信息,请查看本节中的其他操作指南,或 Langchain 表达式语言的概念指南


此页面是否对您有帮助?