跳转到主要内容

如何使用 LangChain 索引 API

在这里,我们将使用 LangChain 索引 API 查看基本的索引工作流程。

索引 API 允许您将来自任何来源的文档加载并保持同步到向量存储中。具体来说,它可以帮助

  • 避免将重复内容写入向量存储
  • 避免重新写入未更改的内容
  • 避免对未更改的内容重新计算嵌入

所有这些都应该节省您的时间和金钱,并改善您的向量搜索结果。

至关重要的是,索引 API 即使对于相对于原始源文档经过多个转换步骤(例如,通过文本分块)的文档也有效。

工作原理

LangChain 索引使用记录管理器 (RecordManager) 来跟踪文档写入向量存储的操作。

在索引内容时,会为每个文档计算哈希值,并将以下信息存储在记录管理器中

  • 文档哈希值(页面内容和元数据的哈希值)
  • 写入时间
  • 源 ID - 每个文档的元数据中应包含信息,以便我们确定此文档的最终来源

删除模式

在将文档索引到向量存储时,向量存储中可能存在一些应该删除的现有文档。在某些情况下,您可能希望删除任何与新索引文档来自相同来源的现有文档。在其他情况下,您可能希望批量删除所有现有文档。索引 API 删除模式允许您选择所需的行为。

清理模式去重内容可并行化清理已删除的源文档清理源文档和/或派生文档的更改清理时机
-
增量持续
完整索引结束后
范围完整索引结束后

None 不会执行任何自动清理,允许用户手动清理旧内容。

incrementalfullscoped_full 提供以下自动化清理:

  • 如果源文档或派生文档的内容发生更改,所有 3 种模式都将清理(删除)以前版本的内容。
  • 如果源文档已被删除(意味着它不包含在当前正在索引的文档中),则 full 清理模式将从向量存储中正确删除它,但 incrementalscoped_full 模式不会。

当内容发生更改时(例如,源 PDF 文件被修订),在索引期间会有一段时间,新版本和旧版本都可能返回给用户。这发生在写入新内容之后,但在删除旧版本之前。

  • incremental 索引可以最大限度地减少这段时间,因为它能够在写入时持续清理。
  • fullscoped_full 模式在所有批次写入完成后进行清理。

要求

  1. 不要将其与独立于索引 API 预先填充内容的存储一起使用,因为记录管理器不会知道以前已插入的记录。
  2. 仅适用于支持以下功能的 LangChain vectorstore
    • 按 ID 添加文档(带有 ids 参数的 add_documents 方法)
    • 按 ID 删除(带有 ids 参数的 delete 方法)

兼容的向量存储:AerospikeAnalyticDBAstraDBAwaDBAzureCosmosDBNoSqlVectorSearchAzureCosmosDBVectorSearchBagelCassandraChromaCouchbaseVectorStoreDashVectorDatabricksVectorSearchDeepLakeDingoElasticVectorSearchElasticsearchStoreFAISSHanaDBMilvusMongoDBAtlasVectorSearchMyScaleOpenSearchVectorSearchPGVectorPineconeQdrantRedisRocksetScaNNSingleStoreDBSupabaseVectorStoreSurrealDBStoreTimescaleVectorValdVDMSVearchVespaStoreWeaviateYellowbrickZepVectorStoreTencentVectorDBOpenSearchVectorSearch

注意

记录管理器依赖于基于时间的机制来确定可以清理哪些内容(当使用 fullincrementalscoped_full 清理模式时)。

如果两个任务背靠背运行,并且第一个任务在时钟时间发生变化之前完成,则第二个任务可能无法清理内容。

在实际设置中,这不太可能成为问题,原因如下:

  1. 记录管理器使用更高分辨率的时间戳。
  2. 数据需要在第一个和第二个任务运行之间发生更改,如果任务之间的时间间隔很小,则这种情况不太可能发生。
  3. 索引任务通常需要几毫秒以上的时间。

快速入门

from langchain.indexes import SQLRecordManager, index
from langchain_core.documents import Document
from langchain_elasticsearch import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings

初始化向量存储并设置嵌入

collection_name = "test_index"

embedding = OpenAIEmbeddings()

vectorstore = ElasticsearchStore(
es_url="https://127.0.0.1:9200", index_name="test_index", embedding=embedding
)

使用适当的命名空间初始化记录管理器。

建议: 使用一个命名空间,其中同时考虑向量存储和向量存储中的集合名称;例如,“redis/my_docs”、“chromadb/my_docs”或“postgres/my_docs”。

namespace = f"elasticsearch/{collection_name}"
record_manager = SQLRecordManager(
namespace, db_url="sqlite:///record_manager_cache.sql"
)

在使用记录管理器之前创建模式。

record_manager.create_schema()

让我们索引一些测试文档

doc1 = Document(page_content="kitty", metadata={"source": "kitty.txt"})
doc2 = Document(page_content="doggy", metadata={"source": "doggy.txt"})

索引到空的向量存储中

def _clear():
"""Hacky helper method to clear content. See the `full` mode section to to understand why it works."""
index([], record_manager, vectorstore, cleanup="full", source_id_key="source")

None 删除模式

此模式不执行旧版本内容的自动清理;但是,它仍然会处理内容去重。

_clear()
index(
[doc1, doc1, doc1, doc1, doc1],
record_manager,
vectorstore,
cleanup=None,
source_id_key="source",
)
{'num_added': 1, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
_clear()
index([doc1, doc2], record_manager, vectorstore, cleanup=None, source_id_key="source")
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}

第二次运行时,所有内容都将被跳过

index([doc1, doc2], record_manager, vectorstore, cleanup=None, source_id_key="source")
{'num_added': 0, 'num_updated': 0, 'num_skipped': 2, 'num_deleted': 0}

"incremental" 删除模式

_clear()
index(
[doc1, doc2],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}

再次索引应导致两个文档都被跳过——也会跳过嵌入操作!

index(
[doc1, doc2],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 0, 'num_updated': 0, 'num_skipped': 2, 'num_deleted': 0}

如果我们没有提供任何带有增量索引模式的文档,则不会发生任何更改。

index([], record_manager, vectorstore, cleanup="incremental", source_id_key="source")
{'num_added': 0, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}

如果我们修改一个文档,新版本将被写入,并且所有共享相同来源的旧版本将被删除。

changed_doc_2 = Document(page_content="puppy", metadata={"source": "doggy.txt"})
index(
[changed_doc_2],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 1, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 1}

"full" 删除模式

full 模式下,用户应将应索引到索引函数中的完整内容传递给索引函数。

任何未传递到索引函数中并且存在于向量存储中的文档都将被删除!

此行为对于处理源文档的删除非常有用。

_clear()
all_docs = [doc1, doc2]
index(all_docs, record_manager, vectorstore, cleanup="full", source_id_key="source")
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}

假设有人删除了第一个文档

del all_docs[0]
all_docs
[Document(page_content='doggy', metadata={'source': 'doggy.txt'})]

使用完整模式也将清理已删除的内容。

index(all_docs, record_manager, vectorstore, cleanup="full", source_id_key="source")
{'num_added': 0, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 1}

来源

元数据属性包含一个名为 source 的字段。此来源应指向与给定文档关联的最终来源。

例如,如果这些文档表示某些父文档的块,则两个文档的 source 应该相同,并引用父文档。

通常,应该始终指定 source。仅当您从不打算使用 incremental 模式,并且由于某种原因无法正确指定 source 字段时,才使用 None

from langchain_text_splitters import CharacterTextSplitter
doc1 = Document(
page_content="kitty kitty kitty kitty kitty", metadata={"source": "kitty.txt"}
)
doc2 = Document(page_content="doggy doggy the doggy", metadata={"source": "doggy.txt"})
new_docs = CharacterTextSplitter(
separator="t", keep_separator=True, chunk_size=12, chunk_overlap=2
).split_documents([doc1, doc2])
new_docs
[Document(page_content='kitty kit', metadata={'source': 'kitty.txt'}),
Document(page_content='tty kitty ki', metadata={'source': 'kitty.txt'}),
Document(page_content='tty kitty', metadata={'source': 'kitty.txt'}),
Document(page_content='doggy doggy', metadata={'source': 'doggy.txt'}),
Document(page_content='the doggy', metadata={'source': 'doggy.txt'})]
_clear()
index(
new_docs,
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 5, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
changed_doggy_docs = [
Document(page_content="woof woof", metadata={"source": "doggy.txt"}),
Document(page_content="woof woof woof", metadata={"source": "doggy.txt"}),
]

这应删除与 doggy.txt 来源关联的旧版本文档,并将其替换为新版本。

index(
changed_doggy_docs,
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 2}
vectorstore.similarity_search("dog", k=30)
[Document(page_content='woof woof', metadata={'source': 'doggy.txt'}),
Document(page_content='woof woof woof', metadata={'source': 'doggy.txt'}),
Document(page_content='tty kitty', metadata={'source': 'kitty.txt'}),
Document(page_content='tty kitty ki', metadata={'source': 'kitty.txt'}),
Document(page_content='kitty kit', metadata={'source': 'kitty.txt'})]

与加载器一起使用

索引可以接受文档的可迭代对象或任何加载器。

注意: 加载器必须正确设置源键。

from langchain_core.document_loaders import BaseLoader


class MyCustomLoader(BaseLoader):
def lazy_load(self):
text_splitter = CharacterTextSplitter(
separator="t", keep_separator=True, chunk_size=12, chunk_overlap=2
)
docs = [
Document(page_content="woof woof", metadata={"source": "doggy.txt"}),
Document(page_content="woof woof woof", metadata={"source": "doggy.txt"}),
]
yield from text_splitter.split_documents(docs)

def load(self):
return list(self.lazy_load())
API 参考:BaseLoader
_clear()
loader = MyCustomLoader()
loader.load()
[Document(page_content='woof woof', metadata={'source': 'doggy.txt'}),
Document(page_content='woof woof woof', metadata={'source': 'doggy.txt'})]
index(loader, record_manager, vectorstore, cleanup="full", source_id_key="source")
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
vectorstore.similarity_search("dog", k=30)
[Document(page_content='woof woof', metadata={'source': 'doggy.txt'}),
Document(page_content='woof woof woof', metadata={'source': 'doggy.txt'})]

此页是否对您有帮助?