Elasticsearch
Elasticsearch 是一个分布式、RESTful 的搜索和分析引擎,能够执行向量和词法搜索。它建立在 Apache Lucene 库之上。
本笔记本展示了如何使用与 Elasticsearch
向量存储相关的功能。
设置
为了使用 Elasticsearch
向量搜索,您必须安装 langchain-elasticsearch
包。
%pip install -qU langchain-elasticsearch
凭证
有两种主要方式可以设置 Elasticsearch 实例以供使用:
- Elastic Cloud:Elastic Cloud 是一种托管的 Elasticsearch 服务。注册免费试用。
要连接到不需要登录凭据的 Elasticsearch 实例(使用启用安全功能的 Docker 实例启动),请将 Elasticsearch URL 和索引名称以及嵌入对象传递给构造函数。
- 本地安装 Elasticsearch:通过在本地运行 Elasticsearch 来开始使用它。最简单的方法是使用官方的 Elasticsearch Docker 镜像。有关更多信息,请参阅Elasticsearch Docker 文档。
通过 Docker 运行 Elasticsearch
示例:运行禁用安全功能的单节点 Elasticsearch 实例。不建议在生产环境中使用。
%docker run -p 9200:9200 -e "discovery.type=single-node" -e "xpack.security.enabled=false" -e "xpack.security.http.ssl.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:8.12.1
使用身份验证运行
对于生产环境,我们建议您启用安全功能运行。要使用登录凭据连接,您可以使用参数 es_api_key
或 es_user
和 es_password
。
pip install -qU langchain-openai
import getpass
import os
if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
from langchain_elasticsearch import ElasticsearchStore
elastic_vector_search = ElasticsearchStore(
es_url="https://:9200",
index_name="langchain_index",
embedding=embeddings,
es_user="elastic",
es_password="changeme",
)
如何获取默认“elastic”用户的密码?
要获取 Elastic Cloud 中默认“elastic”用户的密码
- 登录 Elastic Cloud 控制台:https://cloud.elastic.co
- 前往“安全” > “用户”
- 找到“elastic”用户并点击“编辑”
- 点击“重置密码”
- 按照提示重置密码
如何获取 API 密钥?
要获取 API 密钥
- 登录 Elastic Cloud 控制台:https://cloud.elastic.co
- 打开 Kibana 并前往“堆栈管理” > “API 密钥”
- 点击“创建 API 密钥”
- 输入 API 密钥的名称并点击“创建”
- 复制 API 密钥并将其粘贴到
api_key
参数中
Elastic Cloud
要连接到 Elastic Cloud 上的 Elasticsearch 实例,您可以使用 es_cloud_id
参数或 es_url
。
elastic_vector_search = ElasticsearchStore(
es_cloud_id="<cloud_id>",
index_name="test_index",
embedding=embeddings,
es_user="elastic",
es_password="changeme",
)
如果您想获得一流的模型调用自动化追踪,还可以通过取消注释下方代码来设置您的 LangSmith API 密钥
# os.environ["LANGSMITH_API_KEY"] = getpass.getpass("Enter your LangSmith API key: ")
# os.environ["LANGSMITH_TRACING"] = "true"
初始化
Elasticsearch 在 localhost:9200 上通过 docker 本地运行。有关如何从 Elastic Cloud 连接到 Elasticsearch 的更多详细信息,请参阅上面的使用身份验证连接。
from langchain_elasticsearch import ElasticsearchStore
vector_store = ElasticsearchStore(
"langchain-demo", embedding=embeddings, es_url="https://:9201"
)
管理向量存储
向向量存储添加项目
from uuid import uuid4
from langchain_core.documents import Document
document_1 = Document(
page_content="I had chocolate chip pancakes and scrambled eggs for breakfast this morning.",
metadata={"source": "tweet"},
)
document_2 = Document(
page_content="The weather forecast for tomorrow is cloudy and overcast, with a high of 62 degrees.",
metadata={"source": "news"},
)
document_3 = Document(
page_content="Building an exciting new project with LangChain - come check it out!",
metadata={"source": "tweet"},
)
document_4 = Document(
page_content="Robbers broke into the city bank and stole $1 million in cash.",
metadata={"source": "news"},
)
document_5 = Document(
page_content="Wow! That was an amazing movie. I can't wait to see it again.",
metadata={"source": "tweet"},
)
document_6 = Document(
page_content="Is the new iPhone worth the price? Read this review to find out.",
metadata={"source": "website"},
)
document_7 = Document(
page_content="The top 10 soccer players in the world right now.",
metadata={"source": "website"},
)
document_8 = Document(
page_content="LangGraph is the best framework for building stateful, agentic applications!",
metadata={"source": "tweet"},
)
document_9 = Document(
page_content="The stock market is down 500 points today due to fears of a recession.",
metadata={"source": "news"},
)
document_10 = Document(
page_content="I have a bad feeling I am going to get deleted :(",
metadata={"source": "tweet"},
)
documents = [
document_1,
document_2,
document_3,
document_4,
document_5,
document_6,
document_7,
document_8,
document_9,
document_10,
]
uuids = [str(uuid4()) for _ in range(len(documents))]
vector_store.add_documents(documents=documents, ids=uuids)
['21cca03c-9089-42d2-b41c-3d156be2b519',
'a6ceb967-b552-4802-bb06-c0e95fce386e',
'3a35fac4-e5f0-493b-bee0-9143b41aedae',
'176da099-66b1-4d6a-811b-dfdfe0808d30',
'ecfa1a30-3c97-408b-80c0-5c43d68bf5ff',
'c0f08baa-e70b-4f83-b387-c6e0a0f36f73',
'489b2c9c-1925-43e1-bcf0-0fa94cf1cbc4',
'408c6503-9ba4-49fd-b1cc-95584cd914c5',
'5248c899-16d5-4377-a9e9-736ca443ad4f',
'ca182769-c4fc-4e25-8f0a-8dd0a525955c']
从向量存储删除项目
vector_store.delete(ids=[uuids[-1]])
True
查询向量存储
一旦您的向量存储已创建并添加了相关文档,您很可能希望在运行链或代理时查询它。这些示例还展示了如何在搜索时使用过滤功能。
直接查询
相似性搜索
执行带元数据过滤的简单相似性搜索可以按如下方式进行
results = vector_store.similarity_search(
query="LangChain provides abstractions to make working with LLMs easy",
k=2,
filter=[{"term": {"metadata.source.keyword": "tweet"}}],
)
for res in results:
print(f"* {res.page_content} [{res.metadata}]")
* Building an exciting new project with LangChain - come check it out! [{'source': 'tweet'}]
* LangGraph is the best framework for building stateful, agentic applications! [{'source': 'tweet'}]
带分数的相似性搜索
如果您想执行相似性搜索并接收相应的分数,您可以运行
results = vector_store.similarity_search_with_score(
query="Will it be hot tomorrow",
k=1,
filter=[{"term": {"metadata.source.keyword": "news"}}],
)
for doc, score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
* [SIM=0.765887] The weather forecast for tomorrow is cloudy and overcast, with a high of 62 degrees. [{'source': 'news'}]
转换为检索器进行查询
您还可以将向量存储转换为检索器,以便在您的链中更方便地使用。
retriever = vector_store.as_retriever(
search_type="similarity_score_threshold", search_kwargs={"score_threshold": 0.2}
)
retriever.invoke("Stealing from the bank is a crime")
[Document(metadata={'source': 'news'}, page_content='Robbers broke into the city bank and stole $1 million in cash.'),
Document(metadata={'source': 'news'}, page_content='The stock market is down 500 points today due to fears of a recession.'),
Document(metadata={'source': 'website'}, page_content='Is the new iPhone worth the price? Read this review to find out.'),
Document(metadata={'source': 'tweet'}, page_content='Building an exciting new project with LangChain - come check it out!')]
距离相似度算法
Elasticsearch 支持以下向量距离相似度算法:
- 余弦
- 欧几里得
- 点积
余弦相似度算法是默认算法。
您可以通过 similarity 参数指定所需的相似度算法。
注意:根据检索策略,相似度算法无法在查询时更改。需要在为字段创建索引映射时进行设置。如果需要更改相似度算法,则需要删除索引并使用正确的 distance_strategy
重新创建。
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="https://:9200",
index_name="test",
distance_strategy="COSINE",
# distance_strategy="EUCLIDEAN_DISTANCE"
# distance_strategy="DOT_PRODUCT"
)
检索策略
Elasticsearch 相对于其他纯向量数据库具有巨大优势,因为它能够支持广泛的检索策略。在本笔记本中,我们将配置 ElasticsearchStore
以支持一些最常见的检索策略。
默认情况下,ElasticsearchStore
使用 DenseVectorStrategy
(在 0.2.0 版本之前称为 ApproxRetrievalStrategy
)。
DenseVectorStrategy
这将返回与查询向量最相似的 top k 个向量。k
参数在初始化 ElasticsearchStore
时设置。默认值为 10。
from langchain_elasticsearch import DenseVectorStrategy
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="https://:9200",
index_name="test",
strategy=DenseVectorStrategy(),
)
docs = db.similarity_search(
query="What did the president say about Ketanji Brown Jackson?", k=10
)
示例:密集向量和关键词搜索的混合检索
本示例将展示如何配置 ElasticsearchStore 以执行混合检索,结合使用近似语义搜索和基于关键词的搜索。
我们使用 RRF 来平衡来自不同检索方法的两个分数。
要启用混合检索,我们需要在 DenseVectorStrategy
构造函数中设置 hybrid=True
。
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="https://:9200",
index_name="test",
strategy=DenseVectorStrategy(hybrid=True),
)
启用混合模式后,执行的查询将是近似语义搜索和基于关键词搜索的组合。
它将使用 RRF(倒数排序融合)来平衡来自不同检索方法的两个分数。
注意:RRF 需要 Elasticsearch 8.9.0 或更高版本。
{
"retriever": {
"rrf": {
"retrievers": [
{
"standard": {
"query": {
"bool": {
"filter": [],
"must": [{"match": {"text": {"query": "foo"}}}],
}
},
},
},
{
"knn": {
"field": "vector",
"filter": [],
"k": 1,
"num_candidates": 50,
"query_vector": [1.0, ..., 0.0],
},
},
]
}
}
}
示例:使用 Elasticsearch 中的嵌入模型进行密集向量搜索
本示例将展示如何配置 ElasticsearchStore
以使用部署在 Elasticsearch 中的嵌入模型进行密集向量检索。
要使用此功能,请通过 query_model_id
参数在 DenseVectorStrategy
构造函数中指定 model_id
。
注意:这要求模型已部署并在 Elasticsearch ML 节点中运行。请参阅笔记本示例,了解如何使用 eland
部署模型。
DENSE_SELF_DEPLOYED_INDEX_NAME = "test-dense-self-deployed"
# Note: This does not have an embedding function specified
# Instead, we will use the embedding model deployed in Elasticsearch
db = ElasticsearchStore(
es_cloud_id="<your cloud id>",
es_user="elastic",
es_password="<your password>",
index_name=DENSE_SELF_DEPLOYED_INDEX_NAME,
query_field="text_field",
vector_query_field="vector_query_field.predicted_value",
strategy=DenseVectorStrategy(model_id="sentence-transformers__all-minilm-l6-v2"),
)
# Setup a Ingest Pipeline to perform the embedding
# of the text field
db.client.ingest.put_pipeline(
id="test_pipeline",
processors=[
{
"inference": {
"model_id": "sentence-transformers__all-minilm-l6-v2",
"field_map": {"query_field": "text_field"},
"target_field": "vector_query_field",
}
}
],
)
# creating a new index with the pipeline,
# not relying on langchain to create the index
db.client.indices.create(
index=DENSE_SELF_DEPLOYED_INDEX_NAME,
mappings={
"properties": {
"text_field": {"type": "text"},
"vector_query_field": {
"properties": {
"predicted_value": {
"type": "dense_vector",
"dims": 384,
"index": True,
"similarity": "l2_norm",
}
}
},
}
},
settings={"index": {"default_pipeline": "test_pipeline"}},
)
db.from_texts(
["hello world"],
es_cloud_id="<cloud id>",
es_user="elastic",
es_password="<cloud password>",
index_name=DENSE_SELF_DEPLOYED_INDEX_NAME,
query_field="text_field",
vector_query_field="vector_query_field.predicted_value",
strategy=DenseVectorStrategy(model_id="sentence-transformers__all-minilm-l6-v2"),
)
# Perform search
db.similarity_search("hello world", k=10)
稀疏向量策略 (ELSER)
此策略使用 Elasticsearch 的稀疏向量检索来检索 top-k 结果。目前我们仅支持我们自己的“ELSER”嵌入模型。
注意:这要求 ELSER 模型已部署并在 Elasticsearch ml 节点中运行。
要使用此功能,请在 ElasticsearchStore
构造函数中指定 SparseVectorStrategy
(在 0.2.0 版本之前称为 SparseVectorRetrievalStrategy
)。您需要提供一个模型 ID。
from langchain_elasticsearch import SparseVectorStrategy
# Note that this example doesn't have an embedding function. This is because we infer the tokens at index time and at query time within Elasticsearch.
# This requires the ELSER model to be loaded and running in Elasticsearch.
db = ElasticsearchStore.from_documents(
docs,
es_cloud_id="<cloud id>",
es_user="elastic",
es_password="<cloud password>",
index_name="test-elser",
strategy=SparseVectorStrategy(model_id=".elser_model_2"),
)
db.client.indices.refresh(index="test-elser")
results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson", k=4
)
print(results[0])
DenseVectorScriptScoreStrategy
此策略使用 Elasticsearch 的脚本评分查询来执行精确向量检索(也称为暴力搜索)以检索 top-k 结果。(此策略在 0.2.0 版本之前称为 ExactRetrievalStrategy
。)
要使用此功能,请在 ElasticsearchStore
构造函数中指定 DenseVectorScriptScoreStrategy
。
from langchain_elasticsearch import SparseVectorStrategy
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="https://:9200",
index_name="test",
strategy=DenseVectorScriptScoreStrategy(),
)
BM25Strategy
最后,您可以使用全文关键词搜索。
要使用此功能,请在 ElasticsearchStore
构造函数中指定 BM25Strategy
。
from langchain_elasticsearch import BM25Strategy
db = ElasticsearchStore.from_documents(
docs,
es_url="https://:9200",
index_name="test",
strategy=BM25Strategy(),
)
BM25RetrievalStrategy
此策略允许用户使用纯 BM25 执行搜索,而无需向量搜索。
要使用此功能,请在 ElasticsearchStore
构造函数中指定 BM25RetrievalStrategy
。
请注意,在下面的示例中,未指定嵌入选项,这表明搜索是在不使用嵌入的情况下进行的。
from langchain_elasticsearch import ElasticsearchStore
db = ElasticsearchStore(
es_url="https://:9200",
index_name="test_index",
strategy=ElasticsearchStore.BM25RetrievalStrategy(),
)
db.add_texts(
["foo", "foo bar", "foo bar baz", "bar", "bar baz", "baz"],
)
results = db.similarity_search(query="foo", k=10)
print(results)
自定义查询
通过搜索时的 custom_query
参数,您可以调整用于从 Elasticsearch 检索文档的查询。如果您想使用更复杂的查询以支持字段的线性提升,这将非常有用。
# Example of a custom query thats just doing a BM25 search on the text field.
def custom_query(query_body: dict, query: str):
"""Custom query to be used in Elasticsearch.
Args:
query_body (dict): Elasticsearch query body.
query (str): Query string.
Returns:
dict: Elasticsearch query body.
"""
print("Query Retriever created by the retrieval strategy:")
print(query_body)
print()
new_query_body = {"query": {"match": {"text": query}}}
print("Query thats actually used in Elasticsearch:")
print(new_query_body)
print()
return new_query_body
results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson",
k=4,
custom_query=custom_query,
)
print("Results:")
print(results[0])
自定义文档构建器
通过搜索时的 doc_builder
参数,您可以调整如何使用从 Elasticsearch 检索到的数据构建文档。如果您有不是使用 Langchain 创建的索引,这尤其有用。
from typing import Dict
from langchain_core.documents import Document
def custom_document_builder(hit: Dict) -> Document:
src = hit.get("_source", {})
return Document(
page_content=src.get("content", "Missing content!"),
metadata={
"page_number": src.get("page_number", -1),
"original_filename": src.get("original_filename", "Missing filename!"),
},
)
results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson",
k=4,
doc_builder=custom_document_builder,
)
print("Results:")
print(results[0])
检索增强生成的使用
有关如何将此向量存储用于检索增强生成 (RAG) 的指南,请参阅以下部分
常见问题
问题:将文档索引到 Elasticsearch 时出现超时错误。如何解决?
一个可能的问题是您的文档索引到 Elasticsearch 可能需要更长时间。ElasticsearchStore 使用 Elasticsearch 批量 API,该 API 有一些您可以调整的默认设置,以减少超时错误的可能性。
当您使用 SparseVectorRetrievalStrategy 时,这也是一个好主意。
默认设置包括:
chunk_size
: 500max_chunk_bytes
: 100MB
要调整这些参数,您可以将 chunk_size
和 max_chunk_bytes
参数传递给 ElasticsearchStore 的 add_texts
方法。
vector_store.add_texts(
texts,
bulk_kwargs={
"chunk_size": 50,
"max_chunk_bytes": 200000000
}
)
升级到 ElasticsearchStore
如果您已经在基于 Langchain 的项目中使用了 Elasticsearch,您可能正在使用旧的实现:ElasticVectorSearch
和 ElasticKNNSearch
,它们现已弃用。我们引入了一个名为 ElasticsearchStore
的新实现,它更灵活且易于使用。本笔记本将指导您完成升级到新实现的过程。
有什么新功能?
新实现现在是一个名为 ElasticsearchStore
的类,可以通过策略用于近似密集向量、精确密集向量、稀疏向量 (ELSER)、BM25 检索和混合检索。
我正在使用 ElasticKNNSearch
旧实现
from langchain_community.vectorstores.elastic_vector_search import ElasticKNNSearch
db = ElasticKNNSearch(
elasticsearch_url="https://:9200",
index_name="test_index",
embedding=embedding
)
新实现
from langchain_elasticsearch import ElasticsearchStore, DenseVectorStrategy
db = ElasticsearchStore(
es_url="https://:9200",
index_name="test_index",
embedding=embedding,
# if you use the model_id
# strategy=DenseVectorStrategy(model_id="test_model")
# if you use hybrid search
# strategy=DenseVectorStrategy(hybrid=True)
)
我正在使用 ElasticVectorSearch
旧实现
from langchain_community.vectorstores.elastic_vector_search import ElasticVectorSearch
db = ElasticVectorSearch(
elasticsearch_url="https://:9200",
index_name="test_index",
embedding=embedding
)
新实现
from langchain_elasticsearch import ElasticsearchStore, DenseVectorScriptScoreStrategy
db = ElasticsearchStore(
es_url="https://:9200",
index_name="test_index",
embedding=embedding,
strategy=DenseVectorScriptScoreStrategy()
)
db.client.indices.delete(
index="test-metadata, test-elser, test-basic",
ignore_unavailable=True,
allow_no_indices=True,
)
API 参考
有关所有 ElasticSearchStore
功能和配置的详细文档,请访问 API 参考:https://python.langchain.ac.cn/api_reference/elasticsearch/vectorstores/langchain_elasticsearch.vectorstores.ElasticsearchStore.html