如何使用 LangChain 索引 API
在这里,我们将使用 LangChain 索引 API 查看基本的索引工作流程。
索引 API 允许您将来自任何来源的文档加载并保持同步到向量存储中。具体来说,它可以帮助
- 避免将重复内容写入向量存储
- 避免重新写入未更改的内容
- 避免对未更改的内容重新计算嵌入
所有这些都应该节省您的时间和金钱,并改善您的向量搜索结果。
至关重要的是,索引 API 即使对于相对于原始源文档经过多个转换步骤(例如,通过文本分块)的文档也有效。
工作原理
LangChain 索引使用记录管理器 (RecordManager
) 来跟踪文档写入向量存储的操作。
在索引内容时,会为每个文档计算哈希值,并将以下信息存储在记录管理器中
- 文档哈希值(页面内容和元数据的哈希值)
- 写入时间
- 源 ID - 每个文档的元数据中应包含信息,以便我们确定此文档的最终来源
删除模式
在将文档索引到向量存储时,向量存储中可能存在一些应该删除的现有文档。在某些情况下,您可能希望删除任何与新索引文档来自相同来源的现有文档。在其他情况下,您可能希望批量删除所有现有文档。索引 API 删除模式允许您选择所需的行为。
清理模式 | 去重内容 | 可并行化 | 清理已删除的源文档 | 清理源文档和/或派生文档的更改 | 清理时机 |
---|---|---|---|---|---|
无 | ✅ | ✅ | ❌ | ❌ | - |
增量 | ✅ | ✅ | ❌ | ✅ | 持续 |
完整 | ✅ | ❌ | ✅ | ✅ | 索引结束后 |
范围完整 | ✅ | ✅ | ❌ | ✅ | 索引结束后 |
None
不会执行任何自动清理,允许用户手动清理旧内容。
incremental
、full
和 scoped_full
提供以下自动化清理:
- 如果源文档或派生文档的内容发生更改,所有 3 种模式都将清理(删除)以前版本的内容。
- 如果源文档已被删除(意味着它不包含在当前正在索引的文档中),则
full
清理模式将从向量存储中正确删除它,但incremental
和scoped_full
模式不会。
当内容发生更改时(例如,源 PDF 文件被修订),在索引期间会有一段时间,新版本和旧版本都可能返回给用户。这发生在写入新内容之后,但在删除旧版本之前。
incremental
索引可以最大限度地减少这段时间,因为它能够在写入时持续清理。full
和scoped_full
模式在所有批次写入完成后进行清理。
要求
- 不要将其与独立于索引 API 预先填充内容的存储一起使用,因为记录管理器不会知道以前已插入的记录。
- 仅适用于支持以下功能的 LangChain
vectorstore
:- 按 ID 添加文档(带有
ids
参数的add_documents
方法) - 按 ID 删除(带有
ids
参数的delete
方法)
- 按 ID 添加文档(带有
兼容的向量存储:Aerospike
、AnalyticDB
、AstraDB
、AwaDB
、AzureCosmosDBNoSqlVectorSearch
、AzureCosmosDBVectorSearch
、Bagel
、Cassandra
、Chroma
、CouchbaseVectorStore
、DashVector
、DatabricksVectorSearch
、DeepLake
、Dingo
、ElasticVectorSearch
、ElasticsearchStore
、FAISS
、HanaDB
、Milvus
、MongoDBAtlasVectorSearch
、MyScale
、OpenSearchVectorSearch
、PGVector
、Pinecone
、Qdrant
、Redis
、Rockset
、ScaNN
、SingleStoreDB
、SupabaseVectorStore
、SurrealDBStore
、TimescaleVector
、Vald
、VDMS
、Vearch
、VespaStore
、Weaviate
、Yellowbrick
、ZepVectorStore
、TencentVectorDB
、OpenSearchVectorSearch
。
注意
记录管理器依赖于基于时间的机制来确定可以清理哪些内容(当使用 full
、incremental
或 scoped_full
清理模式时)。
如果两个任务背靠背运行,并且第一个任务在时钟时间发生变化之前完成,则第二个任务可能无法清理内容。
在实际设置中,这不太可能成为问题,原因如下:
- 记录管理器使用更高分辨率的时间戳。
- 数据需要在第一个和第二个任务运行之间发生更改,如果任务之间的时间间隔很小,则这种情况不太可能发生。
- 索引任务通常需要几毫秒以上的时间。
快速入门
from langchain.indexes import SQLRecordManager, index
from langchain_core.documents import Document
from langchain_elasticsearch import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
初始化向量存储并设置嵌入
collection_name = "test_index"
embedding = OpenAIEmbeddings()
vectorstore = ElasticsearchStore(
es_url="https://127.0.0.1:9200", index_name="test_index", embedding=embedding
)
使用适当的命名空间初始化记录管理器。
建议: 使用一个命名空间,其中同时考虑向量存储和向量存储中的集合名称;例如,“redis/my_docs”、“chromadb/my_docs”或“postgres/my_docs”。
namespace = f"elasticsearch/{collection_name}"
record_manager = SQLRecordManager(
namespace, db_url="sqlite:///record_manager_cache.sql"
)
在使用记录管理器之前创建模式。
record_manager.create_schema()
让我们索引一些测试文档
doc1 = Document(page_content="kitty", metadata={"source": "kitty.txt"})
doc2 = Document(page_content="doggy", metadata={"source": "doggy.txt"})
索引到空的向量存储中
def _clear():
"""Hacky helper method to clear content. See the `full` mode section to to understand why it works."""
index([], record_manager, vectorstore, cleanup="full", source_id_key="source")
None
删除模式
此模式不执行旧版本内容的自动清理;但是,它仍然会处理内容去重。
_clear()
index(
[doc1, doc1, doc1, doc1, doc1],
record_manager,
vectorstore,
cleanup=None,
source_id_key="source",
)
{'num_added': 1, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
_clear()
index([doc1, doc2], record_manager, vectorstore, cleanup=None, source_id_key="source")
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
第二次运行时,所有内容都将被跳过
index([doc1, doc2], record_manager, vectorstore, cleanup=None, source_id_key="source")
{'num_added': 0, 'num_updated': 0, 'num_skipped': 2, 'num_deleted': 0}
"incremental"
删除模式
_clear()
index(
[doc1, doc2],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
再次索引应导致两个文档都被跳过——也会跳过嵌入操作!
index(
[doc1, doc2],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 0, 'num_updated': 0, 'num_skipped': 2, 'num_deleted': 0}
如果我们没有提供任何带有增量索引模式的文档,则不会发生任何更改。
index([], record_manager, vectorstore, cleanup="incremental", source_id_key="source")
{'num_added': 0, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
如果我们修改一个文档,新版本将被写入,并且所有共享相同来源的旧版本将被删除。
changed_doc_2 = Document(page_content="puppy", metadata={"source": "doggy.txt"})
index(
[changed_doc_2],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 1, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 1}
"full"
删除模式
在 full
模式下,用户应将应索引到索引函数中的完整内容传递给索引函数。
任何未传递到索引函数中并且存在于向量存储中的文档都将被删除!
此行为对于处理源文档的删除非常有用。
_clear()
all_docs = [doc1, doc2]
index(all_docs, record_manager, vectorstore, cleanup="full", source_id_key="source")
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
假设有人删除了第一个文档
del all_docs[0]
all_docs
[Document(page_content='doggy', metadata={'source': 'doggy.txt'})]
使用完整模式也将清理已删除的内容。
index(all_docs, record_manager, vectorstore, cleanup="full", source_id_key="source")
{'num_added': 0, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 1}
来源
元数据属性包含一个名为 source
的字段。此来源应指向与给定文档关联的最终来源。
例如,如果这些文档表示某些父文档的块,则两个文档的 source
应该相同,并引用父文档。
通常,应该始终指定 source
。仅当您从不打算使用 incremental
模式,并且由于某种原因无法正确指定 source
字段时,才使用 None
。
from langchain_text_splitters import CharacterTextSplitter
doc1 = Document(
page_content="kitty kitty kitty kitty kitty", metadata={"source": "kitty.txt"}
)
doc2 = Document(page_content="doggy doggy the doggy", metadata={"source": "doggy.txt"})
new_docs = CharacterTextSplitter(
separator="t", keep_separator=True, chunk_size=12, chunk_overlap=2
).split_documents([doc1, doc2])
new_docs
[Document(page_content='kitty kit', metadata={'source': 'kitty.txt'}),
Document(page_content='tty kitty ki', metadata={'source': 'kitty.txt'}),
Document(page_content='tty kitty', metadata={'source': 'kitty.txt'}),
Document(page_content='doggy doggy', metadata={'source': 'doggy.txt'}),
Document(page_content='the doggy', metadata={'source': 'doggy.txt'})]
_clear()
index(
new_docs,
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 5, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
changed_doggy_docs = [
Document(page_content="woof woof", metadata={"source": "doggy.txt"}),
Document(page_content="woof woof woof", metadata={"source": "doggy.txt"}),
]
这应删除与 doggy.txt
来源关联的旧版本文档,并将其替换为新版本。
index(
changed_doggy_docs,
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 2}
vectorstore.similarity_search("dog", k=30)
[Document(page_content='woof woof', metadata={'source': 'doggy.txt'}),
Document(page_content='woof woof woof', metadata={'source': 'doggy.txt'}),
Document(page_content='tty kitty', metadata={'source': 'kitty.txt'}),
Document(page_content='tty kitty ki', metadata={'source': 'kitty.txt'}),
Document(page_content='kitty kit', metadata={'source': 'kitty.txt'})]
与加载器一起使用
索引可以接受文档的可迭代对象或任何加载器。
注意: 加载器必须正确设置源键。
from langchain_core.document_loaders import BaseLoader
class MyCustomLoader(BaseLoader):
def lazy_load(self):
text_splitter = CharacterTextSplitter(
separator="t", keep_separator=True, chunk_size=12, chunk_overlap=2
)
docs = [
Document(page_content="woof woof", metadata={"source": "doggy.txt"}),
Document(page_content="woof woof woof", metadata={"source": "doggy.txt"}),
]
yield from text_splitter.split_documents(docs)
def load(self):
return list(self.lazy_load())
_clear()
loader = MyCustomLoader()
loader.load()
[Document(page_content='woof woof', metadata={'source': 'doggy.txt'}),
Document(page_content='woof woof woof', metadata={'source': 'doggy.txt'})]
index(loader, record_manager, vectorstore, cleanup="full", source_id_key="source")
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
vectorstore.similarity_search("dog", k=30)
[Document(page_content='woof woof', metadata={'source': 'doggy.txt'}),
Document(page_content='woof woof woof', metadata={'source': 'doggy.txt'})]