跳到主要内容
Open In ColabOpen on GitHub

Elasticsearch

Elasticsearch 是一个分布式、RESTful 的搜索和分析引擎,能够执行向量和词法搜索。它建立在 Apache Lucene 库之上。

本笔记本展示了如何使用与 Elasticsearch 向量存储相关的功能。

设置

为了使用 Elasticsearch 向量搜索,您必须安装 langchain-elasticsearch 包。

%pip install -qU langchain-elasticsearch

凭证

有两种主要方式可以设置 Elasticsearch 实例以供使用:

  1. Elastic Cloud:Elastic Cloud 是一种托管的 Elasticsearch 服务。注册免费试用

要连接到不需要登录凭据的 Elasticsearch 实例(使用启用安全功能的 Docker 实例启动),请将 Elasticsearch URL 和索引名称以及嵌入对象传递给构造函数。

  1. 本地安装 Elasticsearch:通过在本地运行 Elasticsearch 来开始使用它。最简单的方法是使用官方的 Elasticsearch Docker 镜像。有关更多信息,请参阅Elasticsearch Docker 文档

通过 Docker 运行 Elasticsearch

示例:运行禁用安全功能的单节点 Elasticsearch 实例。不建议在生产环境中使用。

%docker run -p 9200:9200 -e "discovery.type=single-node" -e "xpack.security.enabled=false" -e "xpack.security.http.ssl.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:8.12.1

使用身份验证运行

对于生产环境,我们建议您启用安全功能运行。要使用登录凭据连接,您可以使用参数 es_api_keyes_useres_password

pip install -qU langchain-openai
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
from langchain_elasticsearch import ElasticsearchStore

elastic_vector_search = ElasticsearchStore(
es_url="https://:9200",
index_name="langchain_index",
embedding=embeddings,
es_user="elastic",
es_password="changeme",
)
API 参考:ElasticsearchStore

如何获取默认“elastic”用户的密码?

要获取 Elastic Cloud 中默认“elastic”用户的密码

  1. 登录 Elastic Cloud 控制台:https://cloud.elastic.co
  2. 前往“安全” > “用户”
  3. 找到“elastic”用户并点击“编辑”
  4. 点击“重置密码”
  5. 按照提示重置密码

如何获取 API 密钥?

要获取 API 密钥

  1. 登录 Elastic Cloud 控制台:https://cloud.elastic.co
  2. 打开 Kibana 并前往“堆栈管理” > “API 密钥”
  3. 点击“创建 API 密钥”
  4. 输入 API 密钥的名称并点击“创建”
  5. 复制 API 密钥并将其粘贴到 api_key 参数中

Elastic Cloud

要连接到 Elastic Cloud 上的 Elasticsearch 实例,您可以使用 es_cloud_id 参数或 es_url

elastic_vector_search = ElasticsearchStore(
es_cloud_id="<cloud_id>",
index_name="test_index",
embedding=embeddings,
es_user="elastic",
es_password="changeme",
)

如果您想获得一流的模型调用自动化追踪,还可以通过取消注释下方代码来设置您的 LangSmith API 密钥

# os.environ["LANGSMITH_API_KEY"] = getpass.getpass("Enter your LangSmith API key: ")
# os.environ["LANGSMITH_TRACING"] = "true"

初始化

Elasticsearch 在 localhost:9200 上通过 docker 本地运行。有关如何从 Elastic Cloud 连接到 Elasticsearch 的更多详细信息,请参阅上面的使用身份验证连接

from langchain_elasticsearch import ElasticsearchStore

vector_store = ElasticsearchStore(
"langchain-demo", embedding=embeddings, es_url="https://:9201"
)
API 参考:ElasticsearchStore

管理向量存储

向向量存储添加项目

from uuid import uuid4

from langchain_core.documents import Document

document_1 = Document(
page_content="I had chocolate chip pancakes and scrambled eggs for breakfast this morning.",
metadata={"source": "tweet"},
)

document_2 = Document(
page_content="The weather forecast for tomorrow is cloudy and overcast, with a high of 62 degrees.",
metadata={"source": "news"},
)

document_3 = Document(
page_content="Building an exciting new project with LangChain - come check it out!",
metadata={"source": "tweet"},
)

document_4 = Document(
page_content="Robbers broke into the city bank and stole $1 million in cash.",
metadata={"source": "news"},
)

document_5 = Document(
page_content="Wow! That was an amazing movie. I can't wait to see it again.",
metadata={"source": "tweet"},
)

document_6 = Document(
page_content="Is the new iPhone worth the price? Read this review to find out.",
metadata={"source": "website"},
)

document_7 = Document(
page_content="The top 10 soccer players in the world right now.",
metadata={"source": "website"},
)

document_8 = Document(
page_content="LangGraph is the best framework for building stateful, agentic applications!",
metadata={"source": "tweet"},
)

document_9 = Document(
page_content="The stock market is down 500 points today due to fears of a recession.",
metadata={"source": "news"},
)

document_10 = Document(
page_content="I have a bad feeling I am going to get deleted :(",
metadata={"source": "tweet"},
)

documents = [
document_1,
document_2,
document_3,
document_4,
document_5,
document_6,
document_7,
document_8,
document_9,
document_10,
]
uuids = [str(uuid4()) for _ in range(len(documents))]

vector_store.add_documents(documents=documents, ids=uuids)
API 参考:Document
['21cca03c-9089-42d2-b41c-3d156be2b519',
'a6ceb967-b552-4802-bb06-c0e95fce386e',
'3a35fac4-e5f0-493b-bee0-9143b41aedae',
'176da099-66b1-4d6a-811b-dfdfe0808d30',
'ecfa1a30-3c97-408b-80c0-5c43d68bf5ff',
'c0f08baa-e70b-4f83-b387-c6e0a0f36f73',
'489b2c9c-1925-43e1-bcf0-0fa94cf1cbc4',
'408c6503-9ba4-49fd-b1cc-95584cd914c5',
'5248c899-16d5-4377-a9e9-736ca443ad4f',
'ca182769-c4fc-4e25-8f0a-8dd0a525955c']

从向量存储删除项目

vector_store.delete(ids=[uuids[-1]])
True

查询向量存储

一旦您的向量存储已创建并添加了相关文档,您很可能希望在运行链或代理时查询它。这些示例还展示了如何在搜索时使用过滤功能。

直接查询

执行带元数据过滤的简单相似性搜索可以按如下方式进行

results = vector_store.similarity_search(
query="LangChain provides abstractions to make working with LLMs easy",
k=2,
filter=[{"term": {"metadata.source.keyword": "tweet"}}],
)
for res in results:
print(f"* {res.page_content} [{res.metadata}]")
* Building an exciting new project with LangChain - come check it out! [{'source': 'tweet'}]
* LangGraph is the best framework for building stateful, agentic applications! [{'source': 'tweet'}]

带分数的相似性搜索

如果您想执行相似性搜索并接收相应的分数,您可以运行

results = vector_store.similarity_search_with_score(
query="Will it be hot tomorrow",
k=1,
filter=[{"term": {"metadata.source.keyword": "news"}}],
)
for doc, score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
* [SIM=0.765887] The weather forecast for tomorrow is cloudy and overcast, with a high of 62 degrees. [{'source': 'news'}]

转换为检索器进行查询

您还可以将向量存储转换为检索器,以便在您的链中更方便地使用。

retriever = vector_store.as_retriever(
search_type="similarity_score_threshold", search_kwargs={"score_threshold": 0.2}
)
retriever.invoke("Stealing from the bank is a crime")
[Document(metadata={'source': 'news'}, page_content='Robbers broke into the city bank and stole $1 million in cash.'),
Document(metadata={'source': 'news'}, page_content='The stock market is down 500 points today due to fears of a recession.'),
Document(metadata={'source': 'website'}, page_content='Is the new iPhone worth the price? Read this review to find out.'),
Document(metadata={'source': 'tweet'}, page_content='Building an exciting new project with LangChain - come check it out!')]

距离相似度算法

Elasticsearch 支持以下向量距离相似度算法:

  • 余弦
  • 欧几里得
  • 点积

余弦相似度算法是默认算法。

您可以通过 similarity 参数指定所需的相似度算法。

注意:根据检索策略,相似度算法无法在查询时更改。需要在为字段创建索引映射时进行设置。如果需要更改相似度算法,则需要删除索引并使用正确的 distance_strategy 重新创建。

db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="https://:9200",
index_name="test",
distance_strategy="COSINE",
# distance_strategy="EUCLIDEAN_DISTANCE"
# distance_strategy="DOT_PRODUCT"
)

检索策略

Elasticsearch 相对于其他纯向量数据库具有巨大优势,因为它能够支持广泛的检索策略。在本笔记本中,我们将配置 ElasticsearchStore 以支持一些最常见的检索策略。

默认情况下,ElasticsearchStore 使用 DenseVectorStrategy(在 0.2.0 版本之前称为 ApproxRetrievalStrategy)。

DenseVectorStrategy

这将返回与查询向量最相似的 top k 个向量。k 参数在初始化 ElasticsearchStore 时设置。默认值为 10。

from langchain_elasticsearch import DenseVectorStrategy

db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="https://:9200",
index_name="test",
strategy=DenseVectorStrategy(),
)

docs = db.similarity_search(
query="What did the president say about Ketanji Brown Jackson?", k=10
)
API 参考:DenseVectorStrategy

本示例将展示如何配置 ElasticsearchStore 以执行混合检索,结合使用近似语义搜索和基于关键词的搜索。

我们使用 RRF 来平衡来自不同检索方法的两个分数。

要启用混合检索,我们需要在 DenseVectorStrategy 构造函数中设置 hybrid=True

db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="https://:9200",
index_name="test",
strategy=DenseVectorStrategy(hybrid=True),
)

启用混合模式后,执行的查询将是近似语义搜索和基于关键词搜索的组合。

它将使用 RRF(倒数排序融合)来平衡来自不同检索方法的两个分数。

注意:RRF 需要 Elasticsearch 8.9.0 或更高版本。

{
"retriever": {
"rrf": {
"retrievers": [
{
"standard": {
"query": {
"bool": {
"filter": [],
"must": [{"match": {"text": {"query": "foo"}}}],
}
},
},
},
{
"knn": {
"field": "vector",
"filter": [],
"k": 1,
"num_candidates": 50,
"query_vector": [1.0, ..., 0.0],
},
},
]
}
}
}

示例:使用 Elasticsearch 中的嵌入模型进行密集向量搜索

本示例将展示如何配置 ElasticsearchStore 以使用部署在 Elasticsearch 中的嵌入模型进行密集向量检索。

要使用此功能,请通过 query_model_id 参数在 DenseVectorStrategy 构造函数中指定 model_id

注意:这要求模型已部署并在 Elasticsearch ML 节点中运行。请参阅笔记本示例,了解如何使用 eland 部署模型。

DENSE_SELF_DEPLOYED_INDEX_NAME = "test-dense-self-deployed"

# Note: This does not have an embedding function specified
# Instead, we will use the embedding model deployed in Elasticsearch
db = ElasticsearchStore(
es_cloud_id="<your cloud id>",
es_user="elastic",
es_password="<your password>",
index_name=DENSE_SELF_DEPLOYED_INDEX_NAME,
query_field="text_field",
vector_query_field="vector_query_field.predicted_value",
strategy=DenseVectorStrategy(model_id="sentence-transformers__all-minilm-l6-v2"),
)

# Setup a Ingest Pipeline to perform the embedding
# of the text field
db.client.ingest.put_pipeline(
id="test_pipeline",
processors=[
{
"inference": {
"model_id": "sentence-transformers__all-minilm-l6-v2",
"field_map": {"query_field": "text_field"},
"target_field": "vector_query_field",
}
}
],
)

# creating a new index with the pipeline,
# not relying on langchain to create the index
db.client.indices.create(
index=DENSE_SELF_DEPLOYED_INDEX_NAME,
mappings={
"properties": {
"text_field": {"type": "text"},
"vector_query_field": {
"properties": {
"predicted_value": {
"type": "dense_vector",
"dims": 384,
"index": True,
"similarity": "l2_norm",
}
}
},
}
},
settings={"index": {"default_pipeline": "test_pipeline"}},
)

db.from_texts(
["hello world"],
es_cloud_id="<cloud id>",
es_user="elastic",
es_password="<cloud password>",
index_name=DENSE_SELF_DEPLOYED_INDEX_NAME,
query_field="text_field",
vector_query_field="vector_query_field.predicted_value",
strategy=DenseVectorStrategy(model_id="sentence-transformers__all-minilm-l6-v2"),
)

# Perform search
db.similarity_search("hello world", k=10)

稀疏向量策略 (ELSER)

此策略使用 Elasticsearch 的稀疏向量检索来检索 top-k 结果。目前我们仅支持我们自己的“ELSER”嵌入模型。

注意:这要求 ELSER 模型已部署并在 Elasticsearch ml 节点中运行。

要使用此功能,请在 ElasticsearchStore 构造函数中指定 SparseVectorStrategy(在 0.2.0 版本之前称为 SparseVectorRetrievalStrategy)。您需要提供一个模型 ID。

from langchain_elasticsearch import SparseVectorStrategy

# Note that this example doesn't have an embedding function. This is because we infer the tokens at index time and at query time within Elasticsearch.
# This requires the ELSER model to be loaded and running in Elasticsearch.
db = ElasticsearchStore.from_documents(
docs,
es_cloud_id="<cloud id>",
es_user="elastic",
es_password="<cloud password>",
index_name="test-elser",
strategy=SparseVectorStrategy(model_id=".elser_model_2"),
)

db.client.indices.refresh(index="test-elser")

results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson", k=4
)
print(results[0])

DenseVectorScriptScoreStrategy

此策略使用 Elasticsearch 的脚本评分查询来执行精确向量检索(也称为暴力搜索)以检索 top-k 结果。(此策略在 0.2.0 版本之前称为 ExactRetrievalStrategy。)

要使用此功能,请在 ElasticsearchStore 构造函数中指定 DenseVectorScriptScoreStrategy

from langchain_elasticsearch import SparseVectorStrategy

db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="https://:9200",
index_name="test",
strategy=DenseVectorScriptScoreStrategy(),
)

BM25Strategy

最后,您可以使用全文关键词搜索。

要使用此功能,请在 ElasticsearchStore 构造函数中指定 BM25Strategy

from langchain_elasticsearch import BM25Strategy

db = ElasticsearchStore.from_documents(
docs,
es_url="https://:9200",
index_name="test",
strategy=BM25Strategy(),
)
API 参考:BM25Strategy

BM25RetrievalStrategy

此策略允许用户使用纯 BM25 执行搜索,而无需向量搜索。

要使用此功能,请在 ElasticsearchStore 构造函数中指定 BM25RetrievalStrategy

请注意,在下面的示例中,未指定嵌入选项,这表明搜索是在不使用嵌入的情况下进行的。

from langchain_elasticsearch import ElasticsearchStore

db = ElasticsearchStore(
es_url="https://:9200",
index_name="test_index",
strategy=ElasticsearchStore.BM25RetrievalStrategy(),
)

db.add_texts(
["foo", "foo bar", "foo bar baz", "bar", "bar baz", "baz"],
)

results = db.similarity_search(query="foo", k=10)
print(results)
API 参考:ElasticsearchStore

自定义查询

通过搜索时的 custom_query 参数,您可以调整用于从 Elasticsearch 检索文档的查询。如果您想使用更复杂的查询以支持字段的线性提升,这将非常有用。

# Example of a custom query thats just doing a BM25 search on the text field.
def custom_query(query_body: dict, query: str):
"""Custom query to be used in Elasticsearch.
Args:
query_body (dict): Elasticsearch query body.
query (str): Query string.
Returns:
dict: Elasticsearch query body.
"""
print("Query Retriever created by the retrieval strategy:")
print(query_body)
print()

new_query_body = {"query": {"match": {"text": query}}}

print("Query thats actually used in Elasticsearch:")
print(new_query_body)
print()

return new_query_body


results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson",
k=4,
custom_query=custom_query,
)
print("Results:")
print(results[0])

自定义文档构建器

通过搜索时的 doc_builder 参数,您可以调整如何使用从 Elasticsearch 检索到的数据构建文档。如果您有不是使用 Langchain 创建的索引,这尤其有用。

from typing import Dict

from langchain_core.documents import Document


def custom_document_builder(hit: Dict) -> Document:
src = hit.get("_source", {})
return Document(
page_content=src.get("content", "Missing content!"),
metadata={
"page_number": src.get("page_number", -1),
"original_filename": src.get("original_filename", "Missing filename!"),
},
)


results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson",
k=4,
doc_builder=custom_document_builder,
)
print("Results:")
print(results[0])
API 参考:Document

检索增强生成的使用

有关如何将此向量存储用于检索增强生成 (RAG) 的指南,请参阅以下部分

常见问题

问题:将文档索引到 Elasticsearch 时出现超时错误。如何解决?

一个可能的问题是您的文档索引到 Elasticsearch 可能需要更长时间。ElasticsearchStore 使用 Elasticsearch 批量 API,该 API 有一些您可以调整的默认设置,以减少超时错误的可能性。

当您使用 SparseVectorRetrievalStrategy 时,这也是一个好主意。

默认设置包括:

  • chunk_size: 500
  • max_chunk_bytes: 100MB

要调整这些参数,您可以将 chunk_sizemax_chunk_bytes 参数传递给 ElasticsearchStore 的 add_texts 方法。

    vector_store.add_texts(
texts,
bulk_kwargs={
"chunk_size": 50,
"max_chunk_bytes": 200000000
}
)

升级到 ElasticsearchStore

如果您已经在基于 Langchain 的项目中使用了 Elasticsearch,您可能正在使用旧的实现:ElasticVectorSearchElasticKNNSearch,它们现已弃用。我们引入了一个名为 ElasticsearchStore 的新实现,它更灵活且易于使用。本笔记本将指导您完成升级到新实现的过程。

有什么新功能?

新实现现在是一个名为 ElasticsearchStore 的类,可以通过策略用于近似密集向量、精确密集向量、稀疏向量 (ELSER)、BM25 检索和混合检索。

我正在使用 ElasticKNNSearch

旧实现


from langchain_community.vectorstores.elastic_vector_search import ElasticKNNSearch

db = ElasticKNNSearch(
elasticsearch_url="https://:9200",
index_name="test_index",
embedding=embedding
)

新实现


from langchain_elasticsearch import ElasticsearchStore, DenseVectorStrategy

db = ElasticsearchStore(
es_url="https://:9200",
index_name="test_index",
embedding=embedding,
# if you use the model_id
# strategy=DenseVectorStrategy(model_id="test_model")
# if you use hybrid search
# strategy=DenseVectorStrategy(hybrid=True)
)

我正在使用 ElasticVectorSearch

旧实现


from langchain_community.vectorstores.elastic_vector_search import ElasticVectorSearch

db = ElasticVectorSearch(
elasticsearch_url="https://:9200",
index_name="test_index",
embedding=embedding
)

API 参考:ElasticVectorSearch

新实现


from langchain_elasticsearch import ElasticsearchStore, DenseVectorScriptScoreStrategy

db = ElasticsearchStore(
es_url="https://:9200",
index_name="test_index",
embedding=embedding,
strategy=DenseVectorScriptScoreStrategy()
)

db.client.indices.delete(
index="test-metadata, test-elser, test-basic",
ignore_unavailable=True,
allow_no_indices=True,
)

API 参考

有关所有 ElasticSearchStore 功能和配置的详细文档,请访问 API 参考:https://python.langchain.ac.cn/api_reference/elasticsearch/vectorstores/langchain_elasticsearch.vectorstores.ElasticsearchStore.html