Valthera
启用 AI 代理在用户最有可能响应时与他们互动。
概述
Valthera 是一个开源框架,使 LLM 代理能够以更有意义的方式与用户互动。它基于 BJ Fogg 的行为模型 (B=MAT) 构建,并利用来自多个来源(例如 HubSpot、PostHog 和 Snowflake)的数据来评估用户的动机和能力,然后在触发操作之前进行评估。
在本指南中,您将学习
- 核心概念: 组件概述(数据聚合器、评分器、推理引擎和触发器生成器)。
- 系统架构: 数据如何在系统中流动以及如何做出决策。
- 自定义: 如何扩展连接器、评分指标和决策规则以满足您的需求。
开始吧!
设置
本节介绍依赖项的安装以及为 Valthera 设置自定义数据连接器。
pip install openai langchain langchain_openai valthera langchain_valthera langgraph
from typing import Any, Dict, List
from valthera.connectors.base import BaseConnector
class MockHubSpotConnector(BaseConnector):
"""
Simulates data retrieval from HubSpot. Provides information such as lead score,
lifecycle stage, and marketing metrics.
"""
def get_user_data(self, user_id: str) -> Dict[str, Any]:
"""
Retrieve mock HubSpot data for a given user.
Args:
user_id: The unique identifier for the user
Returns:
A dictionary containing HubSpot user data
"""
return {
"hubspot_contact_id": "999-ZZZ",
"lifecycle_stage": "opportunity",
"lead_status": "engaged",
"hubspot_lead_score": 100,
"company_name": "MaxMotivation Corp.",
"last_contacted_date": "2023-09-20",
"hubspot_marketing_emails_opened": 20,
"marketing_emails_clicked": 10,
}
class MockPostHogConnector(BaseConnector):
"""
Simulates data retrieval from PostHog. Provides session data and engagement events.
"""
def get_user_data(self, user_id: str) -> Dict[str, Any]:
"""
Retrieve mock PostHog data for a given user.
Args:
user_id: The unique identifier for the user
Returns:
A dictionary containing PostHog user data
"""
return {
"distinct_ids": [user_id, f"email_{user_id}"],
"last_event_timestamp": "2023-09-20T12:34:56Z",
"feature_flags": ["beta_dashboard", "early_access"],
"posthog_session_count": 30,
"avg_session_duration_sec": 400,
"recent_event_types": ["pageview", "button_click", "premium_feature_used"],
"posthog_events_count_past_30days": 80,
"posthog_onboarding_steps_completed": 5,
}
class MockSnowflakeConnector(BaseConnector):
"""
Simulates retrieval of additional user profile data from Snowflake.
"""
def get_user_data(self, user_id: str) -> Dict[str, Any]:
"""
Retrieve mock Snowflake data for a given user.
Args:
user_id: The unique identifier for the user
Returns:
A dictionary containing Snowflake user data
"""
return {
"user_id": user_id,
"email": f"{user_id}@example.com",
"subscription_status": "paid",
"plan_tier": "premium",
"account_creation_date": "2023-01-01",
"preferred_language": "en",
"last_login_datetime": "2023-09-20T12:00:00Z",
"behavior_complexity": 3,
}
实例化
在本节中,我们将实例化核心组件。首先,我们创建一个数据聚合器以合并来自自定义连接器的数据。然后,我们配置动机和能力的评分指标。
from valthera.aggregator import DataAggregator
# Constants for configuration
LEAD_SCORE_MAX = 100
EVENTS_COUNT_MAX = 50
EMAILS_OPENED_FACTOR = 10.0
SESSION_COUNT_FACTOR_1 = 5.0
ONBOARDING_STEPS_FACTOR = 5.0
SESSION_COUNT_FACTOR_2 = 10.0
BEHAVIOR_COMPLEXITY_MAX = 5.0
# Initialize data aggregator
data_aggregator = DataAggregator(
connectors={
"hubspot": MockHubSpotConnector(),
"posthog": MockPostHogConnector(),
"snowflake": MockSnowflakeConnector(),
}
)
# You can now fetch unified user data by calling data_aggregator.get_user_context(user_id)
from typing import Callable, Union
from valthera.scorer import ValtheraScorer
# Define transform functions with proper type annotations
def transform_lead_score(x: Union[int, float]) -> float:
"""Transform lead score to a value between 0 and 1."""
return min(x, LEAD_SCORE_MAX) / LEAD_SCORE_MAX
def transform_events_count(x: Union[int, float]) -> float:
"""Transform events count to a value between 0 and 1."""
return min(x, EVENTS_COUNT_MAX) / EVENTS_COUNT_MAX
def transform_emails_opened(x: Union[int, float]) -> float:
"""Transform emails opened to a value between 0 and 1."""
return min(x / EMAILS_OPENED_FACTOR, 1.0)
def transform_session_count_1(x: Union[int, float]) -> float:
"""Transform session count for motivation to a value between 0 and 1."""
return min(x / SESSION_COUNT_FACTOR_1, 1.0)
def transform_onboarding_steps(x: Union[int, float]) -> float:
"""Transform onboarding steps to a value between 0 and 1."""
return min(x / ONBOARDING_STEPS_FACTOR, 1.0)
def transform_session_count_2(x: Union[int, float]) -> float:
"""Transform session count for ability to a value between 0 and 1."""
return min(x / SESSION_COUNT_FACTOR_2, 1.0)
def transform_behavior_complexity(x: Union[int, float]) -> float:
"""Transform behavior complexity to a value between 0 and 1."""
return 1 - (min(x, BEHAVIOR_COMPLEXITY_MAX) / BEHAVIOR_COMPLEXITY_MAX)
# Scoring configuration for user motivation
motivation_config = [
{"key": "hubspot_lead_score", "weight": 0.30, "transform": transform_lead_score},
{
"key": "posthog_events_count_past_30days",
"weight": 0.30,
"transform": transform_events_count,
},
{
"key": "hubspot_marketing_emails_opened",
"weight": 0.20,
"transform": transform_emails_opened,
},
{
"key": "posthog_session_count",
"weight": 0.20,
"transform": transform_session_count_1,
},
]
# Scoring configuration for user ability
ability_config = [
{
"key": "posthog_onboarding_steps_completed",
"weight": 0.30,
"transform": transform_onboarding_steps,
},
{
"key": "posthog_session_count",
"weight": 0.30,
"transform": transform_session_count_2,
},
{
"key": "behavior_complexity",
"weight": 0.40,
"transform": transform_behavior_complexity,
},
]
# Instantiate the scorer
scorer = ValtheraScorer(motivation_config, ability_config)
调用
接下来,我们设置推理引擎和触发器生成器,然后通过实例化 Valthera 工具将所有组件组合在一起。最后,我们执行代理工作流程以处理输入消息。
import os
from langchain_openai import ChatOpenAI
from valthera.reasoning_engine import ReasoningEngine
# Define threshold as constant
SCORE_THRESHOLD = 0.75
# Function to safely get API key
def get_openai_api_key() -> str:
"""Get OpenAI API key with error handling."""
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY not found in environment variables")
return api_key
# Decision rules using constant
decision_rules = [
{
"condition": f"motivation >= {SCORE_THRESHOLD} and ability >= {SCORE_THRESHOLD}",
"action": "trigger",
"description": "Both scores are high enough.",
},
{
"condition": f"motivation < {SCORE_THRESHOLD}",
"action": "improve_motivation",
"description": "User motivation is low.",
},
{
"condition": f"ability < {SCORE_THRESHOLD}",
"action": "improve_ability",
"description": "User ability is low.",
},
{
"condition": "otherwise",
"action": "defer",
"description": "No action needed at this time.",
},
]
try:
api_key = get_openai_api_key()
reasoning_engine = ReasoningEngine(
llm=ChatOpenAI(
model_name="gpt-4-turbo", temperature=0.0, openai_api_key=api_key
),
decision_rules=decision_rules,
)
except ValueError as e:
print(f"Error initializing reasoning engine: {e}")
API 参考:ChatOpenAI
from valthera.trigger_generator import TriggerGenerator
try:
api_key = get_openai_api_key() # Reuse the function for consistency
trigger_generator = TriggerGenerator(
llm=ChatOpenAI(
model_name="gpt-4-turbo", temperature=0.7, openai_api_key=api_key
)
)
except ValueError as e:
print(f"Error initializing trigger generator: {e}")
from langchain_valthera.tools import ValtheraTool
from langgraph.prebuilt import create_react_agent
try:
api_key = get_openai_api_key()
# Initialize Valthera tool
valthera_tool = ValtheraTool(
data_aggregator=data_aggregator,
motivation_config=motivation_config,
ability_config=ability_config,
reasoning_engine=reasoning_engine,
trigger_generator=trigger_generator,
)
# Create agent with LLM
llm = ChatOpenAI(model_name="gpt-4-turbo", temperature=0.0, openai_api_key=api_key)
tools = [valthera_tool]
graph = create_react_agent(llm, tools=tools)
# Define input message for testing
inputs = {
"messages": [("user", "Evaluate behavior for user_12345: Finish Onboarding")]
}
# Process the input and display responses
print("Running Valthera agent workflow...")
for response in graph.stream(inputs, stream_mode="values"):
print(response)
except Exception as e:
print(f"Error running Valthera workflow: {e}")
API 参考:create_react_agent
链接
此集成目前不支持链接操作。未来的版本可能会包含链接支持。
API 参考
以下是 Valthera 集成提供的关键 API 的概述
- 数据聚合器: 使用
data_aggregator.get_user_context(user_id)
获取聚合的用户数据。 - 评分器:
ValtheraScorer
根据提供的配置计算动机和能力得分。 - 推理引擎:
ReasoningEngine
评估决策规则以确定适当的操作(触发、提高动机、提高能力或延迟)。 - 触发器生成器: 使用 LLM 生成个性化的触发消息。
- Valthera 工具: 集成所有组件以处理输入并执行代理工作流程。
有关详细用法,请参阅源代码中的内联文档。