跳到主要内容

ElasticsearchRetriever

Elasticsearch 是一个分布式、RESTful 的搜索和分析引擎。它提供了一个分布式、多租户的全文本搜索引擎,具有 HTTP Web 接口和无模式 JSON 文档。它支持关键词搜索、向量搜索、混合搜索和复杂的过滤。

ElasticsearchRetriever 是一个通用包装器,可以通过 查询 DSL 灵活访问所有 Elasticsearch 功能。对于大多数用例,其他类 (ElasticsearchStoreElasticsearchEmbeddings 等) 应该足够了,但如果不够,您可以使用 ElasticsearchRetriever

本指南将帮助您开始使用 Elasticsearch 检索器。有关所有 ElasticsearchRetriever 功能和配置的详细文档,请访问 API 参考

集成详情

检索器自托管云服务
ElasticsearchRetrieverlangchain_elasticsearch

设置

设置 Elasticsearch 实例主要有两种方法

  • Elastic Cloud:Elastic Cloud 是一项托管 Elasticsearch 服务。注册一个 免费试用。要连接到不需要登录凭据的 Elasticsearch 实例(启动启用安全性的 docker 实例),请将 Elasticsearch URL 和索引名称以及嵌入对象传递给构造函数。

  • 本地安装 Elasticsearch:通过在本地运行 Elasticsearch 开始使用。最简单的方法是使用官方 Elasticsearch Docker 镜像。有关详细信息,请参阅 Elasticsearch Docker 文档

如果要从单个查询获取自动跟踪,还可以通过取消注释以下内容来设置您的 LangSmith API 密钥

# os.environ["LANGSMITH_API_KEY"] = getpass.getpass("Enter your LangSmith API key: ")
# os.environ["LANGSMITH_TRACING"] = "true"

安装

此检索器位于 langchain-elasticsearch 包中。出于演示目的,我们还将安装 langchain-community 来生成文本嵌入

%pip install -qU langchain-community langchain-elasticsearch
from typing import Any, Dict, Iterable

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from langchain_community.embeddings import DeterministicFakeEmbedding
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_elasticsearch import ElasticsearchRetriever

配置

这里我们定义了与 Elasticsearch 的连接。在本示例中,我们使用本地运行的实例。或者,您可以在 Elastic Cloud 中创建一个帐户并开始 免费试用

es_url = "https://127.0.0.1:9200"
es_client = Elasticsearch(hosts=[es_url])
es_client.info()

对于向量搜索,我们仅使用随机嵌入进行说明。对于实际用例,请选择可用的 LangChain 嵌入类之一。

embeddings = DeterministicFakeEmbedding(size=3)

定义示例数据

index_name = "test-langchain-retriever"
text_field = "text"
dense_vector_field = "fake_embedding"
num_characters_field = "num_characters"
texts = [
"foo",
"bar",
"world",
"hello world",
"hello",
"foo bar",
"bla bla foo",
]

索引数据

通常,当用户已经在 Elasticsearch 索引中拥有数据时,会使用 ElasticsearchRetriever。这里我们索引一些示例文本文档。如果使用 ElasticsearchStore.from_documents 创建索引,也可以。

def create_index(
es_client: Elasticsearch,
index_name: str,
text_field: str,
dense_vector_field: str,
num_characters_field: str,
):
es_client.indices.create(
index=index_name,
mappings={
"properties": {
text_field: {"type": "text"},
dense_vector_field: {"type": "dense_vector"},
num_characters_field: {"type": "integer"},
}
},
)


def index_data(
es_client: Elasticsearch,
index_name: str,
text_field: str,
dense_vector_field: str,
embeddings: Embeddings,
texts: Iterable[str],
refresh: bool = True,
) -> None:
create_index(
es_client, index_name, text_field, dense_vector_field, num_characters_field
)

vectors = embeddings.embed_documents(list(texts))
requests = [
{
"_op_type": "index",
"_index": index_name,
"_id": i,
text_field: text,
dense_vector_field: vector,
num_characters_field: len(text),
}
for i, (text, vector) in enumerate(zip(texts, vectors))
]

bulk(es_client, requests)

if refresh:
es_client.indices.refresh(index=index_name)

return len(requests)
index_data(es_client, index_name, text_field, dense_vector_field, embeddings, texts)
7

实例化

本示例中使用伪造嵌入进行密集向量检索。

def vector_query(search_query: str) -> Dict:
vector = embeddings.embed_query(search_query) # same embeddings as for indexing
return {
"knn": {
"field": dense_vector_field,
"query_vector": vector,
"k": 5,
"num_candidates": 10,
}
}


vector_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=vector_query,
content_field=text_field,
url=es_url,
)

vector_retriever.invoke("foo")
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 1.0, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='world', metadata={'_index': 'test-langchain-index', '_id': '2', '_score': 0.6770179, '_source': {'fake_embedding': [-0.7041151202179595, -1.4652961969276497, -0.25786766898672847], 'num_characters': 5}}),
Document(page_content='hello world', metadata={'_index': 'test-langchain-index', '_id': '3', '_score': 0.4816144, '_source': {'fake_embedding': [0.42728413221815387, -1.1889908285425348, -1.445433230084671], 'num_characters': 11}}),
Document(page_content='hello', metadata={'_index': 'test-langchain-index', '_id': '4', '_score': 0.46853775, '_source': {'fake_embedding': [-0.28560441330564046, 0.9958894823084921, 1.5489829880195058], 'num_characters': 5}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.2086992, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}})]

BM25

传统的关键词匹配。

def bm25_query(search_query: str) -> Dict:
return {
"query": {
"match": {
text_field: search_query,
},
},
}


bm25_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=bm25_query,
content_field=text_field,
url=es_url,
)

bm25_retriever.invoke("foo")
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 0.9711467, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.7437035, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='bla bla foo', metadata={'_index': 'test-langchain-index', '_id': '6', '_score': 0.6025789, '_source': {'fake_embedding': [1.7365927060137358, -0.5230400847844948, 0.7978339724186192], 'num_characters': 11}})]

使用 倒数排名融合 (RRF) 结合向量搜索和 BM25 搜索以合并结果集。

def hybrid_query(search_query: str) -> Dict:
vector = embeddings.embed_query(search_query) # same embeddings as for indexing
return {
"retriever": {
"rrf": {
"retrievers": [
{
"standard": {
"query": {
"match": {
text_field: search_query,
}
}
}
},
{
"knn": {
"field": dense_vector_field,
"query_vector": vector,
"k": 5,
"num_candidates": 10,
}
},
]
}
}
}


hybrid_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=hybrid_query,
content_field=text_field,
url=es_url,
)

hybrid_retriever.invoke("foo")
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 0.9711467, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.7437035, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='bla bla foo', metadata={'_index': 'test-langchain-index', '_id': '6', '_score': 0.6025789, '_source': {'fake_embedding': [1.7365927060137358, -0.5230400847844948, 0.7978339724186192], 'num_characters': 11}})]

模糊匹配

关键词匹配,带拼写容错。

def fuzzy_query(search_query: str) -> Dict:
return {
"query": {
"match": {
text_field: {
"query": search_query,
"fuzziness": "AUTO",
}
},
},
}


fuzzy_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=fuzzy_query,
content_field=text_field,
url=es_url,
)

fuzzy_retriever.invoke("fox") # note the character tolernace
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 0.6474311, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.49580228, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='bla bla foo', metadata={'_index': 'test-langchain-index', '_id': '6', '_score': 0.40171927, '_source': {'fake_embedding': [1.7365927060137358, -0.5230400847844948, 0.7978339724186192], 'num_characters': 11}})]

复杂过滤

对不同字段的过滤器进行组合。

def filter_query_func(search_query: str) -> Dict:
return {
"query": {
"bool": {
"must": [
{"range": {num_characters_field: {"gte": 5}}},
],
"must_not": [
{"prefix": {text_field: "bla"}},
],
"should": [
{"match": {text_field: search_query}},
],
}
}
}


filtering_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=filter_query_func,
content_field=text_field,
url=es_url,
)

filtering_retriever.invoke("foo")
[Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 1.7437035, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='world', metadata={'_index': 'test-langchain-index', '_id': '2', '_score': 1.0, '_source': {'fake_embedding': [-0.7041151202179595, -1.4652961969276497, -0.25786766898672847], 'num_characters': 5}}),
Document(page_content='hello world', metadata={'_index': 'test-langchain-index', '_id': '3', '_score': 1.0, '_source': {'fake_embedding': [0.42728413221815387, -1.1889908285425348, -1.445433230084671], 'num_characters': 11}}),
Document(page_content='hello', metadata={'_index': 'test-langchain-index', '_id': '4', '_score': 1.0, '_source': {'fake_embedding': [-0.28560441330564046, 0.9958894823084921, 1.5489829880195058], 'num_characters': 5}})]

请注意,查询匹配位于顶部。其他通过过滤器的文档也包含在结果集中,但它们都具有相同的分数。

自定义文档映射器

可以自定义将 Elasticsearch 结果(命中)映射到 LangChain 文档的函数。

def num_characters_mapper(hit: Dict[str, Any]) -> Document:
num_chars = hit["_source"][num_characters_field]
content = hit["_source"][text_field]
return Document(
page_content=f"This document has {num_chars} characters",
metadata={"text_content": content},
)


custom_mapped_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=filter_query_func,
document_mapper=num_characters_mapper,
url=es_url,
)

custom_mapped_retriever.invoke("foo")
[Document(page_content='This document has 7 characters', metadata={'text_content': 'foo bar'}),
Document(page_content='This document has 5 characters', metadata={'text_content': 'world'}),
Document(page_content='This document has 11 characters', metadata={'text_content': 'hello world'}),
Document(page_content='This document has 5 characters', metadata={'text_content': 'hello'})]

用法

按照上面的例子,我们使用 .invoke 发出单个查询。由于检索器是 Runnables,我们也可以使用 Runnable 接口 中的任何方法,例如 .batch

在链中使用

我们还可以将检索器集成到中,以构建更大的应用程序,例如一个简单的RAG应用程序。为了演示,我们还实例化了一个 OpenAI 聊天模型。

%pip install -qU langchain-openai
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI

prompt = ChatPromptTemplate.from_template(
"""Answer the question based only on the context provided.

Context: {context}

Question: {question}"""
)

llm = ChatOpenAI(model="gpt-4o-mini")


def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)


chain = (
{"context": vector_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
chain.invoke("what is foo?")

API 参考

有关所有 ElasticsearchRetriever 功能和配置的详细文档,请访问API 参考


此页是否有帮助?