如何创建动态(自构建)链
先决条件
本指南假设您熟悉以下内容
有时,我们希望在运行时构建链的一部分,这取决于链的输入(路由 是最常见的例子)。 我们可以使用 RunnableLambda 的一个非常有用的属性来创建这样的动态链,如果 RunnableLambda 返回一个 Runnable,则该 Runnable 本身会被调用。 让我们看一个例子。
选择 聊天模型
pip install -qU langchain-openai
import getpass
import os
if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
# | echo: false
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-sonnet-20240229")
API 参考:ChatAnthropic
from operator import itemgetter
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnablePassthrough, chain
contextualize_instructions = """Convert the latest user question into a standalone question given the chat history. Don't answer the question, return the question and nothing else (no descriptive text)."""
contextualize_prompt = ChatPromptTemplate.from_messages(
[
("system", contextualize_instructions),
("placeholder", "{chat_history}"),
("human", "{question}"),
]
)
contextualize_question = contextualize_prompt | llm | StrOutputParser()
qa_instructions = (
"""Answer the user question given the following context:\n\n{context}."""
)
qa_prompt = ChatPromptTemplate.from_messages(
[("system", qa_instructions), ("human", "{question}")]
)
@chain
def contextualize_if_needed(input_: dict) -> Runnable:
if input_.get("chat_history"):
# NOTE: This is returning another Runnable, not an actual output.
return contextualize_question
else:
return RunnablePassthrough() | itemgetter("question")
@chain
def fake_retriever(input_: dict) -> str:
return "egypt's population in 2024 is about 111 million"
full_chain = (
RunnablePassthrough.assign(question=contextualize_if_needed).assign(
context=fake_retriever
)
| qa_prompt
| llm
| StrOutputParser()
)
full_chain.invoke(
{
"question": "what about egypt",
"chat_history": [
("human", "what's the population of indonesia"),
("ai", "about 276 million"),
],
}
)
"According to the context provided, Egypt's population in 2024 is estimated to be about 111 million."
这里的关键是 contextualize_if_needed
返回另一个 Runnable,而不是实际的输出。当执行完整链时,这个返回的 Runnable 本身会被运行。
查看跟踪,我们可以看到,由于我们传入了 chat_history,我们执行了 contextualize_question 链作为完整链的一部分: https://smith.langchain.com/public/9e0ae34c-4082-4f3f-beed-34a2a2f4c991/r
请注意,返回的 Runnable 的流式传输、批处理等功能都得到了保留
for chunk in contextualize_if_needed.stream(
{
"question": "what about egypt",
"chat_history": [
("human", "what's the population of indonesia"),
("ai", "about 276 million"),
],
}
):
print(chunk)
What
is
the
population
of
Egypt
?