ElasticsearchRetriever
Elasticsearch 是一个分布式、RESTful 的搜索和分析引擎。它提供了一个分布式、多租户的全文本搜索引擎,具有 HTTP Web 接口和无模式 JSON 文档。它支持关键词搜索、向量搜索、混合搜索和复杂的过滤。
ElasticsearchRetriever
是一个通用包装器,可以通过 查询 DSL 灵活访问所有 Elasticsearch
功能。对于大多数用例,其他类 (ElasticsearchStore
、ElasticsearchEmbeddings
等) 应该足够了,但如果不够,您可以使用 ElasticsearchRetriever
。
本指南将帮助您开始使用 Elasticsearch 检索器。有关所有 ElasticsearchRetriever
功能和配置的详细文档,请访问 API 参考。
集成详情
检索器 | 自托管 | 云服务 | 包 |
---|---|---|---|
ElasticsearchRetriever | ✅ | ✅ | langchain_elasticsearch |
设置
设置 Elasticsearch 实例主要有两种方法
-
Elastic Cloud:Elastic Cloud 是一项托管 Elasticsearch 服务。注册一个 免费试用。要连接到不需要登录凭据的 Elasticsearch 实例(启动启用安全性的 docker 实例),请将 Elasticsearch URL 和索引名称以及嵌入对象传递给构造函数。
-
本地安装 Elasticsearch:通过在本地运行 Elasticsearch 开始使用。最简单的方法是使用官方 Elasticsearch Docker 镜像。有关详细信息,请参阅 Elasticsearch Docker 文档。
如果要从单个查询获取自动跟踪,还可以通过取消注释以下内容来设置您的 LangSmith API 密钥
# os.environ["LANGSMITH_API_KEY"] = getpass.getpass("Enter your LangSmith API key: ")
# os.environ["LANGSMITH_TRACING"] = "true"
安装
此检索器位于 langchain-elasticsearch
包中。出于演示目的,我们还将安装 langchain-community
来生成文本嵌入。
%pip install -qU langchain-community langchain-elasticsearch
from typing import Any, Dict, Iterable
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from langchain_community.embeddings import DeterministicFakeEmbedding
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_elasticsearch import ElasticsearchRetriever
配置
这里我们定义了与 Elasticsearch 的连接。在本示例中,我们使用本地运行的实例。或者,您可以在 Elastic Cloud 中创建一个帐户并开始 免费试用。
es_url = "https://127.0.0.1:9200"
es_client = Elasticsearch(hosts=[es_url])
es_client.info()
对于向量搜索,我们仅使用随机嵌入进行说明。对于实际用例,请选择可用的 LangChain 嵌入类之一。
embeddings = DeterministicFakeEmbedding(size=3)
定义示例数据
index_name = "test-langchain-retriever"
text_field = "text"
dense_vector_field = "fake_embedding"
num_characters_field = "num_characters"
texts = [
"foo",
"bar",
"world",
"hello world",
"hello",
"foo bar",
"bla bla foo",
]
索引数据
通常,当用户已经在 Elasticsearch 索引中拥有数据时,会使用 ElasticsearchRetriever
。这里我们索引一些示例文本文档。如果使用 ElasticsearchStore.from_documents
创建索引,也可以。
def create_index(
es_client: Elasticsearch,
index_name: str,
text_field: str,
dense_vector_field: str,
num_characters_field: str,
):
es_client.indices.create(
index=index_name,
mappings={
"properties": {
text_field: {"type": "text"},
dense_vector_field: {"type": "dense_vector"},
num_characters_field: {"type": "integer"},
}
},
)
def index_data(
es_client: Elasticsearch,
index_name: str,
text_field: str,
dense_vector_field: str,
embeddings: Embeddings,
texts: Iterable[str],
refresh: bool = True,
) -> None:
create_index(
es_client, index_name, text_field, dense_vector_field, num_characters_field
)
vectors = embeddings.embed_documents(list(texts))
requests = [
{
"_op_type": "index",
"_index": index_name,
"_id": i,
text_field: text,
dense_vector_field: vector,
num_characters_field: len(text),
}
for i, (text, vector) in enumerate(zip(texts, vectors))
]
bulk(es_client, requests)
if refresh:
es_client.indices.refresh(index=index_name)
return len(requests)
index_data(es_client, index_name, text_field, dense_vector_field, embeddings, texts)
7
实例化
向量搜索
本示例中使用伪造嵌入进行密集向量检索。
def vector_query(search_query: str) -> Dict:
vector = embeddings.embed_query(search_query) # same embeddings as for indexing
return {
"knn": {
"field": dense_vector_field,
"query_vector": vector,
"k": 5,
"num_candidates": 10,
}
}
vector_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=vector_query,
content_field=text_field,
url=es_url,
)
vector_retriever.invoke("foo")
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 1.0, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='world', metadata={'_index': 'test-langchain-index', '_id': '2', '_score': 0.6770179, '_source': {'fake_embedding': [-0.7041151202179595, -1.4652961969276497, -0.25786766898672847], 'num_characters': 5}}),
Document(page_content='hello world', metadata={'_index': 'test-langchain-index', '_id': '3', '_score': 0.4816144, '_source': {'fake_embedding': [0.42728413221815387, -1.1889908285425348, -1.445433230084671], 'num_characters': 11}}),
Document(page_content='hello', metadata={'_index': 'test-langchain-index', '_id': '4', '_score': 0.46853775, '_source': {'fake_embedding': [-0.28560441330564046, 0.9958894823084921, 1.5489829880195058], 'num_characters': 5}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.2086992, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}})]
BM25
传统的关键词匹配。
def bm25_query(search_query: str) -> Dict:
return {
"query": {
"match": {
text_field: search_query,
},
},
}
bm25_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=bm25_query,
content_field=text_field,
url=es_url,
)
bm25_retriever.invoke("foo")
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 0.9711467, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.7437035, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='bla bla foo', metadata={'_index': 'test-langchain-index', '_id': '6', '_score': 0.6025789, '_source': {'fake_embedding': [1.7365927060137358, -0.5230400847844948, 0.7978339724186192], 'num_characters': 11}})]
混合搜索
使用 倒数排名融合 (RRF) 结合向量搜索和 BM25 搜索以合并结果集。
def hybrid_query(search_query: str) -> Dict:
vector = embeddings.embed_query(search_query) # same embeddings as for indexing
return {
"retriever": {
"rrf": {
"retrievers": [
{
"standard": {
"query": {
"match": {
text_field: search_query,
}
}
}
},
{
"knn": {
"field": dense_vector_field,
"query_vector": vector,
"k": 5,
"num_candidates": 10,
}
},
]
}
}
}
hybrid_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=hybrid_query,
content_field=text_field,
url=es_url,
)
hybrid_retriever.invoke("foo")
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 0.9711467, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.7437035, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='bla bla foo', metadata={'_index': 'test-langchain-index', '_id': '6', '_score': 0.6025789, '_source': {'fake_embedding': [1.7365927060137358, -0.5230400847844948, 0.7978339724186192], 'num_characters': 11}})]
模糊匹配
关键词匹配,带拼写容错。
def fuzzy_query(search_query: str) -> Dict:
return {
"query": {
"match": {
text_field: {
"query": search_query,
"fuzziness": "AUTO",
}
},
},
}
fuzzy_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=fuzzy_query,
content_field=text_field,
url=es_url,
)
fuzzy_retriever.invoke("fox") # note the character tolernace
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 0.6474311, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.49580228, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='bla bla foo', metadata={'_index': 'test-langchain-index', '_id': '6', '_score': 0.40171927, '_source': {'fake_embedding': [1.7365927060137358, -0.5230400847844948, 0.7978339724186192], 'num_characters': 11}})]
复杂过滤
对不同字段的过滤器进行组合。
def filter_query_func(search_query: str) -> Dict:
return {
"query": {
"bool": {
"must": [
{"range": {num_characters_field: {"gte": 5}}},
],
"must_not": [
{"prefix": {text_field: "bla"}},
],
"should": [
{"match": {text_field: search_query}},
],
}
}
}
filtering_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=filter_query_func,
content_field=text_field,
url=es_url,
)
filtering_retriever.invoke("foo")
[Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 1.7437035, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='world', metadata={'_index': 'test-langchain-index', '_id': '2', '_score': 1.0, '_source': {'fake_embedding': [-0.7041151202179595, -1.4652961969276497, -0.25786766898672847], 'num_characters': 5}}),
Document(page_content='hello world', metadata={'_index': 'test-langchain-index', '_id': '3', '_score': 1.0, '_source': {'fake_embedding': [0.42728413221815387, -1.1889908285425348, -1.445433230084671], 'num_characters': 11}}),
Document(page_content='hello', metadata={'_index': 'test-langchain-index', '_id': '4', '_score': 1.0, '_source': {'fake_embedding': [-0.28560441330564046, 0.9958894823084921, 1.5489829880195058], 'num_characters': 5}})]
请注意,查询匹配位于顶部。其他通过过滤器的文档也包含在结果集中,但它们都具有相同的分数。
自定义文档映射器
可以自定义将 Elasticsearch 结果(命中)映射到 LangChain 文档的函数。
def num_characters_mapper(hit: Dict[str, Any]) -> Document:
num_chars = hit["_source"][num_characters_field]
content = hit["_source"][text_field]
return Document(
page_content=f"This document has {num_chars} characters",
metadata={"text_content": content},
)
custom_mapped_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=filter_query_func,
document_mapper=num_characters_mapper,
url=es_url,
)
custom_mapped_retriever.invoke("foo")
[Document(page_content='This document has 7 characters', metadata={'text_content': 'foo bar'}),
Document(page_content='This document has 5 characters', metadata={'text_content': 'world'}),
Document(page_content='This document has 11 characters', metadata={'text_content': 'hello world'}),
Document(page_content='This document has 5 characters', metadata={'text_content': 'hello'})]
用法
按照上面的例子,我们使用 .invoke
发出单个查询。由于检索器是 Runnables,我们也可以使用 Runnable 接口 中的任何方法,例如 .batch
。
在链中使用
我们还可以将检索器集成到链中,以构建更大的应用程序,例如一个简单的RAG应用程序。为了演示,我们还实例化了一个 OpenAI 聊天模型。
%pip install -qU langchain-openai
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
prompt = ChatPromptTemplate.from_template(
"""Answer the question based only on the context provided.
Context: {context}
Question: {question}"""
)
llm = ChatOpenAI(model="gpt-4o-mini")
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
chain = (
{"context": vector_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
chain.invoke("what is foo?")
API 参考
有关所有 ElasticsearchRetriever
功能和配置的详细文档,请访问API 参考。