跳至主要内容

ElasticsearchRetriever

Elasticsearch 是一个分布式、RESTful 的搜索和分析引擎。它提供了一个分布式、多租户功能的全文搜索引擎,具有 HTTP 网页界面和无模式 JSON 文档。它支持关键字搜索、向量搜索、混合搜索和复杂过滤。

ElasticsearchRetriever 是一个通用包装器,可以通过 查询 DSL 灵活访问所有 Elasticsearch 功能。对于大多数用例,其他类(ElasticsearchStoreElasticsearchEmbeddings 等)应该足够了,但如果它们不够,您可以使用 ElasticsearchRetriever

本指南将帮助您开始使用 Elasticsearch 检索器。有关所有 ElasticsearchRetriever 功能和配置的详细文档,请访问 API 参考

集成详细信息

检索器自托管云服务
ElasticsearchRetrieverlangchain_elasticsearch

设置

有两种主要方法可以设置 Elasticsearch 实例

  • Elastic Cloud:Elastic Cloud 是一种托管的 Elasticsearch 服务。注册一个 免费试用。要连接到不需要登录凭据的 Elasticsearch 实例(使用启用安全性的方式启动 docker 实例),请将 Elasticsearch URL 和索引名称以及嵌入对象一起传递给构造函数。

  • 本地安装 Elasticsearch:通过在本地运行它来开始使用 Elasticsearch。最简单的方法是使用官方 Elasticsearch Docker 镜像。有关更多信息,请参阅 Elasticsearch Docker 文档

如果您希望从单个查询获取自动跟踪,您还可以设置您的 LangSmith API 密钥,方法是在下面取消注释

# os.environ["LANGSMITH_API_KEY"] = getpass.getpass("Enter your LangSmith API key: ")
# os.environ["LANGSMITH_TRACING"] = "true"

安装

此检索器位于 langchain-elasticsearch 包中。出于演示目的,我们还将安装 langchain-community 以生成文本 嵌入

%pip install -qU langchain-community langchain-elasticsearch
from typing import Any, Dict, Iterable

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from langchain_community.embeddings import DeterministicFakeEmbedding
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_elasticsearch import ElasticsearchRetriever

配置

在这里,我们定义了与 Elasticsearch 的连接。在本例中,我们使用本地运行的实例。或者,您可以在 Elastic Cloud 中创建一个帐户并开始 免费试用

es_url = "http://localhost:9200"
es_client = Elasticsearch(hosts=[es_url])
es_client.info()

对于向量搜索,我们将使用随机嵌入仅用于说明。对于实际用例,请选择可用的 LangChain 嵌入 类之一。

embeddings = DeterministicFakeEmbedding(size=3)

定义示例数据

index_name = "test-langchain-retriever"
text_field = "text"
dense_vector_field = "fake_embedding"
num_characters_field = "num_characters"
texts = [
"foo",
"bar",
"world",
"hello world",
"hello",
"foo bar",
"bla bla foo",
]

索引数据

通常,用户在 Elasticsearch 索引中已有数据时会使用 ElasticsearchRetriever。这里我们索引一些示例文本文档。如果您例如使用 ElasticsearchStore.from_documents 创建了一个索引,那也很好。

def create_index(
es_client: Elasticsearch,
index_name: str,
text_field: str,
dense_vector_field: str,
num_characters_field: str,
):
es_client.indices.create(
index=index_name,
mappings={
"properties": {
text_field: {"type": "text"},
dense_vector_field: {"type": "dense_vector"},
num_characters_field: {"type": "integer"},
}
},
)


def index_data(
es_client: Elasticsearch,
index_name: str,
text_field: str,
dense_vector_field: str,
embeddings: Embeddings,
texts: Iterable[str],
refresh: bool = True,
) -> None:
create_index(
es_client, index_name, text_field, dense_vector_field, num_characters_field
)

vectors = embeddings.embed_documents(list(texts))
requests = [
{
"_op_type": "index",
"_index": index_name,
"_id": i,
text_field: text,
dense_vector_field: vector,
num_characters_field: len(text),
}
for i, (text, vector) in enumerate(zip(texts, vectors))
]

bulk(es_client, requests)

if refresh:
es_client.indices.refresh(index=index_name)

return len(requests)
index_data(es_client, index_name, text_field, dense_vector_field, embeddings, texts)
7

实例化

在本例中,使用伪造的嵌入进行密集向量检索。

def vector_query(search_query: str) -> Dict:
vector = embeddings.embed_query(search_query) # same embeddings as for indexing
return {
"knn": {
"field": dense_vector_field,
"query_vector": vector,
"k": 5,
"num_candidates": 10,
}
}


vector_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=vector_query,
content_field=text_field,
url=es_url,
)

vector_retriever.invoke("foo")
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 1.0, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='world', metadata={'_index': 'test-langchain-index', '_id': '2', '_score': 0.6770179, '_source': {'fake_embedding': [-0.7041151202179595, -1.4652961969276497, -0.25786766898672847], 'num_characters': 5}}),
Document(page_content='hello world', metadata={'_index': 'test-langchain-index', '_id': '3', '_score': 0.4816144, '_source': {'fake_embedding': [0.42728413221815387, -1.1889908285425348, -1.445433230084671], 'num_characters': 11}}),
Document(page_content='hello', metadata={'_index': 'test-langchain-index', '_id': '4', '_score': 0.46853775, '_source': {'fake_embedding': [-0.28560441330564046, 0.9958894823084921, 1.5489829880195058], 'num_characters': 5}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.2086992, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}})]

BM25

传统的关键字匹配。

def bm25_query(search_query: str) -> Dict:
return {
"query": {
"match": {
text_field: search_query,
},
},
}


bm25_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=bm25_query,
content_field=text_field,
url=es_url,
)

bm25_retriever.invoke("foo")
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 0.9711467, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.7437035, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='bla bla foo', metadata={'_index': 'test-langchain-index', '_id': '6', '_score': 0.6025789, '_source': {'fake_embedding': [1.7365927060137358, -0.5230400847844948, 0.7978339724186192], 'num_characters': 11}})]

使用 倒排文件融合(RRF)结合向量搜索和 BM25 搜索来组合结果集。

def hybrid_query(search_query: str) -> Dict:
vector = embeddings.embed_query(search_query) # same embeddings as for indexing
return {
"query": {
"match": {
text_field: search_query,
},
},
"knn": {
"field": dense_vector_field,
"query_vector": vector,
"k": 5,
"num_candidates": 10,
},
"rank": {"rrf": {}},
}


hybrid_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=hybrid_query,
content_field=text_field,
url=es_url,
)

hybrid_retriever.invoke("foo")
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 0.9711467, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.7437035, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='bla bla foo', metadata={'_index': 'test-langchain-index', '_id': '6', '_score': 0.6025789, '_source': {'fake_embedding': [1.7365927060137358, -0.5230400847844948, 0.7978339724186192], 'num_characters': 11}})]

模糊匹配

具有拼写容错的关键字匹配。

def fuzzy_query(search_query: str) -> Dict:
return {
"query": {
"match": {
text_field: {
"query": search_query,
"fuzziness": "AUTO",
}
},
},
}


fuzzy_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=fuzzy_query,
content_field=text_field,
url=es_url,
)

fuzzy_retriever.invoke("fox") # note the character tolernace
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 0.6474311, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.49580228, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='bla bla foo', metadata={'_index': 'test-langchain-index', '_id': '6', '_score': 0.40171927, '_source': {'fake_embedding': [1.7365927060137358, -0.5230400847844948, 0.7978339724186192], 'num_characters': 11}})]

复杂过滤

在不同字段上组合过滤器。

def filter_query_func(search_query: str) -> Dict:
return {
"query": {
"bool": {
"must": [
{"range": {num_characters_field: {"gte": 5}}},
],
"must_not": [
{"prefix": {text_field: "bla"}},
],
"should": [
{"match": {text_field: search_query}},
],
}
}
}


filtering_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=filter_query_func,
content_field=text_field,
url=es_url,
)

filtering_retriever.invoke("foo")
[Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 1.7437035, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='world', metadata={'_index': 'test-langchain-index', '_id': '2', '_score': 1.0, '_source': {'fake_embedding': [-0.7041151202179595, -1.4652961969276497, -0.25786766898672847], 'num_characters': 5}}),
Document(page_content='hello world', metadata={'_index': 'test-langchain-index', '_id': '3', '_score': 1.0, '_source': {'fake_embedding': [0.42728413221815387, -1.1889908285425348, -1.445433230084671], 'num_characters': 11}}),
Document(page_content='hello', metadata={'_index': 'test-langchain-index', '_id': '4', '_score': 1.0, '_source': {'fake_embedding': [-0.28560441330564046, 0.9958894823084921, 1.5489829880195058], 'num_characters': 5}})]

请注意,查询匹配位于顶部。通过过滤器的其他文档也包含在结果集中,但它们都具有相同的得分。

自定义文档映射器

可以自定义将 Elasticsearch 结果(命中)映射到 LangChain 文档的函数。

def num_characters_mapper(hit: Dict[str, Any]) -> Document:
num_chars = hit["_source"][num_characters_field]
content = hit["_source"][text_field]
return Document(
page_content=f"This document has {num_chars} characters",
metadata={"text_content": content},
)


custom_mapped_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=filter_query_func,
document_mapper=num_characters_mapper,
url=es_url,
)

custom_mapped_retriever.invoke("foo")
[Document(page_content='This document has 7 characters', metadata={'text_content': 'foo bar'}),
Document(page_content='This document has 5 characters', metadata={'text_content': 'world'}),
Document(page_content='This document has 11 characters', metadata={'text_content': 'hello world'}),
Document(page_content='This document has 5 characters', metadata={'text_content': 'hello'})]

用法

根据以上示例,我们使用 .invoke 发出单个查询。因为检索器是可运行的,所以我们也可以使用 可运行接口 中的任何方法,例如 .batch

在链中使用

我们还可以将检索器整合到 中以构建更大的应用程序,例如一个简单的 RAG 应用程序。出于演示目的,我们还实例化了一个 OpenAI 聊天模型。

%pip install -qU langchain-openai
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI

prompt = ChatPromptTemplate.from_template(
"""Answer the question based only on the context provided.

Context: {context}

Question: {question}"""
)

llm = ChatOpenAI(model="gpt-3.5-turbo-0125")


def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)


chain = (
{"context": vector_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
chain.invoke("what is foo?")

API 参考

有关所有 ElasticsearchRetriever 功能和配置的详细文档,请访问 API 参考


此页面是否有帮助?


您还可以留下详细的反馈 在 GitHub 上.