跳到主要内容
Open In ColabOpen on GitHub

Amazon Neptune with SPARQL

Amazon Neptune 是一个高性能的图分析和无服务器数据库,具有卓越的可扩展性和可用性。

此示例展示了 QA 链如何使用 资源描述框架 (RDF) 数据在 Amazon Neptune 图数据库中查询资源描述框架 (RDF) 数据,并返回人类可读的响应。

SPARQL 是 RDF 图的标准查询语言。

此示例使用 NeptuneRdfGraph 类连接 Neptune 数据库并加载其模式。 create_neptune_sparql_qa_chain 用于连接图和 LLM 以提出自然语言问题。

此笔记本演示了使用组织数据的示例。

运行此笔记本的要求

  • 可从此笔记本访问的 Neptune 1.2.x 集群
  • 具有 Python 3.9 或更高版本的内核
  • 对于 Bedrock 访问,请确保 IAM 角色具有此策略
{
"Action": [
"bedrock:ListFoundationModels",
"bedrock:InvokeModel"
],
"Resource": "*",
"Effect": "Allow"
}
  • 用于暂存示例数据的 S3 存储桶。该存储桶应与 Neptune 位于同一账户/区域中。

设置

播种 W3C 组织数据

播种 W3C 组织数据,W3C 组织本体以及一些实例。

您将需要一个与 Neptune 集群位于同一区域和账户中的 S3 存储桶。将 STAGE_BUCKET 设置为该存储桶的名称。

STAGE_BUCKET = "<bucket-name>"
%%bash  -s "$STAGE_BUCKET"

rm -rf data
mkdir -p data
cd data
echo getting org ontology and sample org instances
wget http://www.w3.org/ns/org.ttl
wget https://raw.githubusercontent.com/aws-samples/amazon-neptune-ontology-example-blog/main/data/example_org.ttl

echo Copying org ttl to S3
aws s3 cp org.ttl s3://$1/org.ttl
aws s3 cp example_org.ttl s3://$1/example_org.ttl

我们将使用 graph-notebook 包中的 %load magic 命令将 W3C 数据插入到 Neptune 图中。在运行 %load 之前,请使用 %%graph_notebook_config 设置图连接参数。

!pip install --upgrade --quiet graph-notebook
%load_ext graph_notebook.magics
%%graph_notebook_config
{
"host": "<neptune-endpoint>",
"neptune_service": "neptune-db",
"port": 8182,
"auth_mode": "<[DEFAULT|IAM]>",
"load_from_s3_arn": "<neptune-cluster-load-role-arn>",
"ssl": true,
"aws_region": "<region>"
}

批量加载 org ttl - 包括本体和实例。

%load -s s3://{STAGE_BUCKET} -f turtle --store-to loadres --run
%load_status {loadres['payload']['loadId']} --errors --details

设置链

!pip install --upgrade --quiet langchain-aws

** 重启内核 **

准备一个示例

EXAMPLES = """

<question>
Find organizations.
</question>

<sparql>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX org: <http://www.w3.org/ns/org#>

select ?org ?orgName where {{
?org rdfs:label ?orgName .
}}
</sparql>

<question>
Find sites of an organization
</question>

<sparql>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX org: <http://www.w3.org/ns/org#>

select ?org ?orgName ?siteName where {{
?org rdfs:label ?orgName .
?org org:hasSite/rdfs:label ?siteName .
}}
</sparql>

<question>
Find suborganizations of an organization
</question>

<sparql>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX org: <http://www.w3.org/ns/org#>

select ?org ?orgName ?subName where {{
?org rdfs:label ?orgName .
?org org:hasSubOrganization/rdfs:label ?subName .
}}
</sparql>

<question>
Find organizational units of an organization
</question>

<sparql>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX org: <http://www.w3.org/ns/org#>

select ?org ?orgName ?unitName where {{
?org rdfs:label ?orgName .
?org org:hasUnit/rdfs:label ?unitName .
}}
</sparql>

<question>
Find members of an organization. Also find their manager, or the member they report to.
</question>

<sparql>
PREFIX org: <http://www.w3.org/ns/org#>
PREFIX foaf: <http://xmlns.com/foaf/0.1/>

select * where {{
?person rdf:type foaf:Person .
?person org:memberOf ?org .
OPTIONAL {{ ?person foaf:firstName ?firstName . }}
OPTIONAL {{ ?person foaf:family_name ?lastName . }}
OPTIONAL {{ ?person org:reportsTo ??manager }} .
}}
</sparql>


<question>
Find change events, such as mergers and acquisitions, of an organization
</question>

<sparql>
PREFIX org: <http://www.w3.org/ns/org#>

select ?event ?prop ?obj where {{
?org rdfs:label ?orgName .
?event rdf:type org:ChangeEvent .
?event org:originalOrganization ?origOrg .
?event org:resultingOrganization ?resultingOrg .
}}
</sparql>

"""

创建 Neptune 数据库 RDF 图

from langchain_aws.graphs import NeptuneRdfGraph

host = "<your host>"
port = 8182 # change if different
region = "us-east-1" # change if different
graph = NeptuneRdfGraph(host=host, port=port, use_iam_auth=True, region_name=region)

# Optionally, change the schema
# elems = graph.get_schema_elements
# change elems ...
# graph.load_schema(elems)
API 参考:NeptuneRdfGraph

使用 Neptune SPARQL QA 链

此 QA 链使用 SPARQL 查询 Neptune 图数据库,并返回人类可读的响应。

from langchain_aws import ChatBedrockConverse
from langchain_aws.chains import create_neptune_sparql_qa_chain

MODEL_ID = "anthropic.claude-3-5-sonnet-20241022-v2:0"
llm = ChatBedrockConverse(
model_id=MODEL_ID,
temperature=0,
)

chain = create_neptune_sparql_qa_chain(
llm=llm,
graph=graph,
examples=EXAMPLES,
)

result = chain.invoke("How many organizations are in the graph?")
print(result["result"].content)

以下是一些可以在已摄取的图数据上尝试的更多提示。

result = chain.invoke("Are there any mergers or acquisitions?")
print(result["result"].content)
result = chain.invoke("Find organizations.")
print(result["result"].content)
result = chain.invoke("Find sites of MegaSystems or MegaFinancial.")
print(result["result"].content)
result = chain.invoke("Find a member who is a manager of one or more members.")
print(result["result"].content)
result = chain.invoke("Find five members and their managers.")
print(result["result"].content)
result = chain.invoke(
"Find org units or suborganizations of The Mega Group. What are the sites of those units?"
)
print(result["result"].content)

添加消息历史记录

Neptune SPARQL QA 链可以被 RunnableWithMessageHistory 包裹。 这会将消息历史记录添加到链中,从而使我们能够创建一个可以在多次调用中保留对话状态的聊天机器人。

首先,我们需要一种存储和加载消息历史记录的方法。 为此,每个线程都将创建为 InMemoryChatMessageHistory 的实例,并存储到字典中以便重复访问。

(另请参阅:https://python.langchain.ac.cn/docs/versions/migrating_memory/chat_history/#chatmessagehistory)

from langchain_core.chat_history import InMemoryChatMessageHistory

chats_by_session_id = {}


def get_chat_history(session_id: str) -> InMemoryChatMessageHistory:
chat_history = chats_by_session_id.get(session_id)
if chat_history is None:
chat_history = InMemoryChatMessageHistory()
chats_by_session_id[session_id] = chat_history
return chat_history

现在,QA 链和消息历史记录存储可以用于创建新的 RunnableWithMessageHistory。 请注意,我们必须将 query 设置为输入键,以匹配基础链期望的格式。

from langchain_core.runnables.history import RunnableWithMessageHistory

runnable_with_history = RunnableWithMessageHistory(
chain,
get_chat_history,
input_messages_key="query",
)

在调用链之前,需要为新 InMemoryChatMessageHistory 将记住的对话生成唯一的 session_id

import uuid

session_id = uuid.uuid4()

最后,使用 session_id 调用启用消息历史记录的链。

result = runnable_with_history.invoke(
{"query": "How many org units or suborganizations does the The Mega Group have?"},
config={"configurable": {"session_id": session_id}},
)
print(result["result"].content)

当使用相同的 session_id 继续调用链时,将在对话中先前查询的上下文中返回响应。

result = runnable_with_history.invoke(
{"query": "List the sites for each of the units."},
config={"configurable": {"session_id": session_id}},
)
print(result["result"].content)

此页是否对您有帮助?