跳到主要内容
Open In ColabOpen on GitHub

Kafka

Kafka 是一个分布式消息系统,用于发布和订阅记录流。本演示展示了如何使用 KafkaChatMessageHistory 来存储和检索 Kafka 集群中的聊天消息。

运行此演示需要一个正在运行的 Kafka 集群。您可以按照此说明在本地创建一个 Kafka 集群。

from langchain_community.chat_message_histories import KafkaChatMessageHistory

chat_session_id = "chat-message-history-kafka"
bootstrap_servers = "localhost:64797" # host:port. `localhost:Plaintext Ports` if setup Kafka cluster locally
history = KafkaChatMessageHistory(
chat_session_id,
bootstrap_servers,
)

构建 KafkaChatMessageHistory 的可选参数

  • ttl_ms:聊天消息的生存时间,以毫秒为单位。
  • partition:用于存储聊天消息的主题分区数。
  • replication_factor:用于存储聊天消息的主题复制因子。

KafkaChatMessageHistory 内部使用 Kafka 消费者来读取聊天消息,并且它能够持久地标记消费位置。它具有以下方法来检索聊天消息

  • messages:继续从最后一个消息消费聊天消息。
  • messages_from_beginning:将消费者重置到历史记录的开头并消费消息。可选参数
    1. max_message_count:要读取的最大消息数。
    2. max_time_sec:读取消息的最大时间,以秒为单位。
  • messages_from_latest:将消费者重置到聊天历史记录的末尾,并尝试消费消息。可选参数与上面相同。
  • messages_from_last_consumed:返回从上次消费的消息开始继续的消息,类似于 messages,但带有可选参数。

max_message_countmax_time_sec 用于避免在检索消息时无限期阻塞。因此,messages 和其他检索消息的方法可能不会返回聊天历史记录中的所有消息。您需要指定 max_message_countmax_time_sec 以在单个批次中检索所有聊天历史记录。

添加消息并检索。

history.add_user_message("hi!")
history.add_ai_message("whats up?")

history.messages
[HumanMessage(content='hi!'), AIMessage(content='whats up?')]

再次调用 messages 返回一个空列表,因为消费者位于聊天历史记录的末尾。

history.messages
[]

添加新消息并继续消费。

history.add_user_message("hi again!")
history.add_ai_message("whats up again?")
history.messages
[HumanMessage(content='hi again!'), AIMessage(content='whats up again?')]

要重置消费者并从头开始读取

history.messages_from_beginning()
[HumanMessage(content='hi again!'),
AIMessage(content='whats up again?'),
HumanMessage(content='hi!'),
AIMessage(content='whats up?')]

将消费者设置为聊天历史记录的末尾,添加一些新消息,并消费

history.messages_from_latest()
history.add_user_message("HI!")
history.add_ai_message("WHATS UP?")
history.messages
[HumanMessage(content='HI!'), AIMessage(content='WHATS UP?')]

此页是否对您有帮助?