Cassandra 数据库工具包
Apache Cassandra®
是一种广泛用于存储事务性应用程序数据的数据库。大型语言模型中函数和 >工具的引入为生成式 AI 应用程序中的现有数据开辟了一些激动人心的用例。
Cassandra 数据库
工具包使 AI 工程师能够有效地将代理与 Cassandra 数据集成,提供以下功能
- 通过优化的查询实现快速数据访问。大多数查询应该在不到 10 毫秒的时间内完成。
- 模式内省,以增强 LLM 推理能力
- 与各种 Cassandra 部署兼容,包括 Apache Cassandra®、DataStax Enterprise™ 和 DataStax Astra™
- 目前,该工具包仅限于 SELECT 查询和模式内省操作。(安全第一)
有关创建 Cassandra DB 代理的更多信息,请参阅 CQL 代理食谱
快速入门
- 安装
cassio
库 - 为要连接到的 Cassandra 数据库设置环境变量
- 初始化
CassandraDatabase
- 使用
toolkit.get_tools()
将工具传递给你的代理 - 坐下来,看着它为你完成所有工作
操作理论
Cassandra 查询语言 (CQL)
是与 Cassandra 数据库交互的主要以人为本的方式。虽然在生成查询时提供了一些灵活性,但它需要了解 Cassandra 数据建模的最佳实践。LLM 函数调用使代理能够推理,然后选择一个工具来满足请求。使用 LLM 的代理应该在选择适当的工具包或工具包链时使用 Cassandra 特定的逻辑进行推理。这减少了在 LLM 被迫提供自上而下的解决方案时引入的随机性。你希望 LLM 完全不受限制地访问你的数据库吗?是的。可能不会。为此,我们提供了一个提示,用于在构建代理问题时使用
你是一个 Apache Cassandra 专家查询分析机器人,具有以下功能和规则
- 你将从最终用户那里获取有关在数据库中查找特定数据的问题。
- 你将检查数据库的模式并创建一个查询路径。
- 你将向用户提供查找他们要查找数据的正确查询,并显示查询路径提供的步骤。
- 你将使用最佳实践来使用分区键和聚类列查询 Apache Cassandra。
- 避免在查询中使用 ALLOW FILTERING。
- 目标是找到一个查询路径,因此可能需要查询其他表才能获得最终答案。
以下是一个 JSON 格式的查询路径示例
{
"query_paths": [
{
"description": "Direct query to users table using email",
"steps": [
{
"table": "user_credentials",
"query":
"SELECT userid FROM user_credentials WHERE email = '[email protected]';"
},
{
"table": "users",
"query": "SELECT * FROM users WHERE userid = ?;"
}
]
}
]
}
提供的工具
cassandra_db_schema
收集已连接数据库或特定模式的所有模式信息。这对代理在确定操作时至关重要。
cassandra_db_select_table_data
从特定 keyspace 和表中选择数据。代理可以为谓词和返回记录数量的限制传递参数。
cassandra_db_query
cassandra_db_select_table_data
的实验性替代方案,它接受由代理完全形成的查询字符串,而不是参数。警告:这会导致不寻常的查询,这些查询可能没有那么高效(甚至无法工作)。这可能会在将来的版本中删除。如果它做了一些很酷的事情,我们也想知道。你永远不知道!
环境设置
安装以下 Python 模块
pip install ipykernel python-dotenv cassio langchain_openai langchain langchain-community langchainhub
.env 文件
连接通过 cassio
使用 auto=True
参数,并且笔记本使用 OpenAI。你应该相应地创建一个 .env
文件。
对于 Casssandra,设置
CASSANDRA_CONTACT_POINTS
CASSANDRA_USERNAME
CASSANDRA_PASSWORD
CASSANDRA_KEYSPACE
对于 Astra,设置
ASTRA_DB_APPLICATION_TOKEN
ASTRA_DB_DATABASE_ID
ASTRA_DB_KEYSPACE
例如
# Connection to Astra:
ASTRA_DB_DATABASE_ID=a1b2c3d4-...
ASTRA_DB_APPLICATION_TOKEN=AstraCS:...
ASTRA_DB_KEYSPACE=notebooks
# Also set
OPENAI_API_KEY=sk-....
(你也可以修改以下代码以直接连接 cassio
。)
from dotenv import load_dotenv
load_dotenv(override=True)
# Import necessary libraries
import os
import cassio
from langchain import hub
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_community.agent_toolkits.cassandra_database.toolkit import (
CassandraDatabaseToolkit,
)
from langchain_community.tools.cassandra_database.prompt import QUERY_PATH_PROMPT
from langchain_community.utilities.cassandra_database import CassandraDatabase
from langchain_openai import ChatOpenAI
连接到 Cassandra 数据库
cassio.init(auto=True)
session = cassio.config.resolve_session()
if not session:
raise Exception(
"Check environment configuration or manually configure cassio connection parameters"
)
# Test data pep
session = cassio.config.resolve_session()
session.execute("""DROP KEYSPACE IF EXISTS langchain_agent_test; """)
session.execute(
"""
CREATE KEYSPACE if not exists langchain_agent_test
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
"""
)
session.execute(
"""
CREATE TABLE IF NOT EXISTS langchain_agent_test.user_credentials (
user_email text PRIMARY KEY,
user_id UUID,
password TEXT
);
"""
)
session.execute(
"""
CREATE TABLE IF NOT EXISTS langchain_agent_test.users (
id UUID PRIMARY KEY,
name TEXT,
email TEXT
);"""
)
session.execute(
"""
CREATE TABLE IF NOT EXISTS langchain_agent_test.user_videos (
user_id UUID,
video_id UUID,
title TEXT,
description TEXT,
PRIMARY KEY (user_id, video_id)
);
"""
)
user_id = "522b1fe2-2e36-4cef-a667-cd4237d08b89"
video_id = "27066014-bad7-9f58-5a30-f63fe03718f6"
session.execute(
f"""
INSERT INTO langchain_agent_test.user_credentials (user_id, user_email)
VALUES ({user_id}, '[email protected]');
"""
)
session.execute(
f"""
INSERT INTO langchain_agent_test.users (id, name, email)
VALUES ({user_id}, 'Patrick McFadin', '[email protected]');
"""
)
session.execute(
f"""
INSERT INTO langchain_agent_test.user_videos (user_id, video_id, title)
VALUES ({user_id}, {video_id}, 'Use Langflow to Build a LangChain LLM Application in 5 Minutes');
"""
)
session.set_keyspace("langchain_agent_test")
# Create a CassandraDatabase instance
# Uses the cassio session to connect to the database
db = CassandraDatabase()
# Choose the LLM that will drive the agent
# Only certain models support this
llm = ChatOpenAI(temperature=0, model="gpt-4-1106-preview")
toolkit = CassandraDatabaseToolkit(db=db)
tools = toolkit.get_tools()
print("Available tools:")
for tool in tools:
print(tool.name + "\t- " + tool.description)
Available tools:
cassandra_db_schema -
Input to this tool is a keyspace name, output is a table description
of Apache Cassandra tables.
If the query is not correct, an error message will be returned.
If an error is returned, report back to the user that the keyspace
doesn't exist and stop.
cassandra_db_query -
Execute a CQL query against the database and get back the result.
If the query is not correct, an error message will be returned.
If an error is returned, rewrite the query, check the query, and try again.
cassandra_db_select_table_data -
Tool for getting data from a table in an Apache Cassandra database.
Use the WHERE clause to specify the predicate for the query that uses the
primary key. A blank predicate will return all rows. Avoid this if possible.
Use the limit to specify the number of rows to return. A blank limit will
return all rows.
prompt = hub.pull("hwchase17/openai-tools-agent")
# Construct the OpenAI Tools agent
agent = create_openai_tools_agent(llm, tools, prompt)
input = (
QUERY_PATH_PROMPT
+ "\n\nHere is your task: Find all the videos that the user with the email address '[email protected]' has uploaded to the langchain_agent_test keyspace."
)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
response = agent_executor.invoke({"input": input})
print(response["output"])
[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `cassandra_db_schema` with `{'keyspace': 'langchain_agent_test'}`
[0m[36;1m[1;3mTable Name: user_credentials
- Keyspace: langchain_agent_test
- Columns
- password (text)
- user_email (text)
- user_id (uuid)
- Partition Keys: (user_email)
- Clustering Keys:
Table Name: user_videos
- Keyspace: langchain_agent_test
- Columns
- description (text)
- title (text)
- user_id (uuid)
- video_id (uuid)
- Partition Keys: (user_id)
- Clustering Keys: (video_id asc)
Table Name: users
- Keyspace: langchain_agent_test
- Columns
- email (text)
- id (uuid)
- name (text)
- Partition Keys: (id)
- Clustering Keys:
[0m[32;1m[1;3m
Invoking: `cassandra_db_select_table_data` with `{'keyspace': 'langchain_agent_test', 'table': 'user_credentials', 'predicate': "user_email = '[email protected]'", 'limit': 1}`
[0m[38;5;200m[1;3mRow(user_email='[email protected]', password=None, user_id=UUID('522b1fe2-2e36-4cef-a667-cd4237d08b89'))[0m[32;1m[1;3m
Invoking: `cassandra_db_select_table_data` with `{'keyspace': 'langchain_agent_test', 'table': 'user_videos', 'predicate': 'user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89', 'limit': 10}`
[0m[38;5;200m[1;3mRow(user_id=UUID('522b1fe2-2e36-4cef-a667-cd4237d08b89'), video_id=UUID('27066014-bad7-9f58-5a30-f63fe03718f6'), description='DataStax Academy is a free resource for learning Apache Cassandra.', title='DataStax Academy')[0m[32;1m[1;3mTo find all the videos that the user with the email address '[email protected]' has uploaded to the `langchain_agent_test` keyspace, we can follow these steps:
1. Query the `user_credentials` table to find the `user_id` associated with the email '[email protected]'.
2. Use the `user_id` obtained from the first step to query the `user_videos` table to retrieve all the videos uploaded by the user.
Here is the query path in JSON format:
```json
{
"query_paths": [
{
"description": "Find user_id from user_credentials and then query user_videos for all videos uploaded by the user",
"steps": [
{
"table": "user_credentials",
"query": "SELECT user_id FROM user_credentials WHERE user_email = '[email protected]';"
},
{
"table": "user_videos",
"query": "SELECT * FROM user_videos WHERE user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89;"
}
]
}
]
}
按照此查询路径,我们发现具有 user_id 522b1fe2-2e36-4cef-a667-cd4237d08b89
的用户已上传至少一个标题为“DataStax Academy”且描述为“DataStax Academy 是一个学习 Apache Cassandra 的免费资源”的视频。此视频的 video_id 为 27066014-bad7-9f58-5a30-f63fe03718f6
。如果有更多视频,可以使用相同的查询检索它们,如果需要,可能需要增加限制。[0m
[1m> 完成链。[0m 要查找具有电子邮件地址 '[email protected]' 的用户上传到 langchain_agent_test
keyspace 的所有视频,我们可以按照以下步骤操作
- 查询
user_credentials
表以查找与电子邮件 '[email protected]' 关联的user_id
。 - 使用从第一步获得的
user_id
查询user_videos
表以检索用户上传的所有视频。
以下是 JSON 格式的查询路径
{
"query_paths": [
{
"description": "Find user_id from user_credentials and then query user_videos for all videos uploaded by the user",
"steps": [
{
"table": "user_credentials",
"query": "SELECT user_id FROM user_credentials WHERE user_email = '[email protected]';"
},
{
"table": "user_videos",
"query": "SELECT * FROM user_videos WHERE user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89;"
}
]
}
]
}
按照此查询路径,我们发现具有 user_id 522b1fe2-2e36-4cef-a667-cd4237d08b89
的用户已上传至少一个标题为“DataStax Academy”且描述为“DataStax Academy 是一个学习 Apache Cassandra 的免费资源”的视频。此视频的 video_id 为 27066014-bad7-9f58-5a30-f63fe03718f6
。如果有更多视频,可以使用相同的查询检索它们,如果需要,可能需要增加限制。
## Related
- Tool [conceptual guide](/docs/concepts/#tools)
- Tool [how-to guides](/docs/how_to/#tools)