Handoffs#
Handoff 是一种多智能体设计模式,由 OpenAI 在一个名为 Swarm 的实验项目中引入。其核心思想是让智能体通过特殊的工具调用将任务委托给其他智能体。
我们可以使用 AutoGen Core API 通过事件驱动的智能体来实现 Handoff 模式。使用 AutoGen (v0.4+) 相比 OpenAI 的实现和以前的版本 (v0.2) 具有以下优势:
通过使用分布式智能体运行时,它可以扩展到分布式环境。
它提供了自带智能体实现的灵活性。
原生异步 API 使其易于与 UI 和其他系统集成。
本 Notebook 演示了 Handoff 模式的一个简单实现。建议阅读 主题和订阅 以了解发布-订阅和事件驱动智能体的基本概念。
注意
我们目前正在 AgentChat 中为 Handoff 模式开发高级 API,以便您可以更快地开始使用。
场景#
此场景基于 OpenAI 示例 进行了修改。
考虑一个客户服务场景,客户正试图为某个产品申请退款,或者从聊天机器人处购买新产品。聊天机器人是一个多智能体团队,由三个 AI 智能体和一个人类智能体组成:
分类智能体(Triage Agent),负责理解客户的请求并决定将其转交给哪些其他智能体。
退款智能体(Refund Agent),负责处理退款请求。
销售智能体(Sales Agent),负责处理销售请求。
人类智能体(Human Agent),负责处理 AI 智能体无法处理的复杂请求。
在此场景中,客户通过用户智能体(User Agent)与聊天机器人交互。
下图显示了此场景中智能体的交互拓扑。
让我们使用 AutoGen Core 实现此场景。首先,我们需要导入必要的模块。
import json
import uuid
from typing import List, Tuple
from autogen_core import (
FunctionCall,
MessageContext,
RoutedAgent,
SingleThreadedAgentRuntime,
TopicId,
TypeSubscription,
message_handler,
)
from autogen_core.models import (
AssistantMessage,
ChatCompletionClient,
FunctionExecutionResult,
FunctionExecutionResultMessage,
LLMMessage,
SystemMessage,
UserMessage,
)
from autogen_core.tools import FunctionTool, Tool
from autogen_ext.models.openai import OpenAIChatCompletionClient
from pydantic import BaseModel
消息协议#
在此之前,我们需要定义智能体之间通信的消息协议。我们使用事件驱动的发布-订阅通信,因此这些消息类型将用作事件。
UserLogin是当用户登录并开始新会话时由运行时发布的消息。UserTask是包含用户会话聊天历史记录的消息。当 AI 智能体将任务转交给其他智能体时,它也会发布UserTask消息。AgentResponse是由 AI 智能体和人类智能体发布的消息,它也包含聊天历史记录以及客户回复的主题类型。
class UserLogin(BaseModel):
pass
class UserTask(BaseModel):
context: List[LLMMessage]
class AgentResponse(BaseModel):
reply_to_topic_type: str
context: List[LLMMessage]
AI 智能体#
我们从 AIAgent 类开始,它是多智能体聊天机器人中所有 AI 智能体(即分类、销售以及问题和维修智能体)的类。AIAgent 使用 ChatCompletionClient 来生成响应。它可以直接使用常规工具,或者使用 delegate_tools 将任务委托给其他智能体。它订阅主题类型 agent_topic_type 以接收来自客户的消息,并通过发布到主题类型 user_topic_type 向客户发送消息。
在 handle_task 方法中,智能体首先使用模型生成响应。如果响应包含 Handoff 工具调用,智能体将通过向工具调用结果中指定的主题发布 UserTask 消息来将任务委托给另一个智能体。如果响应是常规工具调用,智能体将执行该工具并再次调用模型以生成下一个响应,直到响应不是工具调用。
当模型响应不是工具调用时,智能体通过发布到 user_topic_type 向客户发送 AgentResponse 消息。
class AIAgent(RoutedAgent):
def __init__(
self,
description: str,
system_message: SystemMessage,
model_client: ChatCompletionClient,
tools: List[Tool],
delegate_tools: List[Tool],
agent_topic_type: str,
user_topic_type: str,
) -> None:
super().__init__(description)
self._system_message = system_message
self._model_client = model_client
self._tools = dict([(tool.name, tool) for tool in tools])
self._tool_schema = [tool.schema for tool in tools]
self._delegate_tools = dict([(tool.name, tool) for tool in delegate_tools])
self._delegate_tool_schema = [tool.schema for tool in delegate_tools]
self._agent_topic_type = agent_topic_type
self._user_topic_type = user_topic_type
@message_handler
async def handle_task(self, message: UserTask, ctx: MessageContext) -> None:
# Send the task to the LLM.
llm_result = await self._model_client.create(
messages=[self._system_message] + message.context,
tools=self._tool_schema + self._delegate_tool_schema,
cancellation_token=ctx.cancellation_token,
)
print(f"{'-'*80}\n{self.id.type}:\n{llm_result.content}", flush=True)
# Process the LLM result.
while isinstance(llm_result.content, list) and all(isinstance(m, FunctionCall) for m in llm_result.content):
tool_call_results: List[FunctionExecutionResult] = []
delegate_targets: List[Tuple[str, UserTask]] = []
# Process each function call.
for call in llm_result.content:
arguments = json.loads(call.arguments)
if call.name in self._tools:
# Execute the tool directly.
result = await self._tools[call.name].run_json(arguments, ctx.cancellation_token)
result_as_str = self._tools[call.name].return_value_as_string(result)
tool_call_results.append(
FunctionExecutionResult(call_id=call.id, content=result_as_str, is_error=False, name=call.name)
)
elif call.name in self._delegate_tools:
# Execute the tool to get the delegate agent's topic type.
result = await self._delegate_tools[call.name].run_json(arguments, ctx.cancellation_token)
topic_type = self._delegate_tools[call.name].return_value_as_string(result)
# Create the context for the delegate agent, including the function call and the result.
delegate_messages = list(message.context) + [
AssistantMessage(content=[call], source=self.id.type),
FunctionExecutionResultMessage(
content=[
FunctionExecutionResult(
call_id=call.id,
content=f"Transferred to {topic_type}. Adopt persona immediately.",
is_error=False,
name=call.name,
)
]
),
]
delegate_targets.append((topic_type, UserTask(context=delegate_messages)))
else:
raise ValueError(f"Unknown tool: {call.name}")
if len(delegate_targets) > 0:
# Delegate the task to other agents by publishing messages to the corresponding topics.
for topic_type, task in delegate_targets:
print(f"{'-'*80}\n{self.id.type}:\nDelegating to {topic_type}", flush=True)
await self.publish_message(task, topic_id=TopicId(topic_type, source=self.id.key))
if len(tool_call_results) > 0:
print(f"{'-'*80}\n{self.id.type}:\n{tool_call_results}", flush=True)
# Make another LLM call with the results.
message.context.extend(
[
AssistantMessage(content=llm_result.content, source=self.id.type),
FunctionExecutionResultMessage(content=tool_call_results),
]
)
llm_result = await self._model_client.create(
messages=[self._system_message] + message.context,
tools=self._tool_schema + self._delegate_tool_schema,
cancellation_token=ctx.cancellation_token,
)
print(f"{'-'*80}\n{self.id.type}:\n{llm_result.content}", flush=True)
else:
# The task has been delegated, so we are done.
return
# The task has been completed, publish the final result.
assert isinstance(llm_result.content, str)
message.context.append(AssistantMessage(content=llm_result.content, source=self.id.type))
await self.publish_message(
AgentResponse(context=message.context, reply_to_topic_type=self._agent_topic_type),
topic_id=TopicId(self._user_topic_type, source=self.id.key),
)
人类智能体#
HumanAgent 类是聊天机器人中人类的代理。它用于处理 AI 智能体无法处理的请求。HumanAgent 订阅主题类型 agent_topic_type 以接收消息,并通过发布到主题类型 user_topic_type 向客户发送消息。
在此实现中,HumanAgent 只是使用控制台获取您的输入。在实际应用程序中,您可以按如下方式改进此设计:
在
handle_user_task方法中,通过 Teams 或 Slack 等聊天应用程序发送通知。聊天应用程序通过运行时将人类的响应发布到由
agent_topic_type指定的主题。创建另一个消息处理程序来处理人类的响应并将其发送回客户。
class HumanAgent(RoutedAgent):
def __init__(self, description: str, agent_topic_type: str, user_topic_type: str) -> None:
super().__init__(description)
self._agent_topic_type = agent_topic_type
self._user_topic_type = user_topic_type
@message_handler
async def handle_user_task(self, message: UserTask, ctx: MessageContext) -> None:
human_input = input("Human agent input: ")
print(f"{'-'*80}\n{self.id.type}:\n{human_input}", flush=True)
message.context.append(AssistantMessage(content=human_input, source=self.id.type))
await self.publish_message(
AgentResponse(context=message.context, reply_to_topic_type=self._agent_topic_type),
topic_id=TopicId(self._user_topic_type, source=self.id.key),
)
用户智能体#
UserAgent 类是与聊天机器人对话的客户的代理。它处理两种消息类型:UserLogin 和 AgentResponse。当 UserAgent 收到 UserLogin 消息时,它会与聊天机器人开始一个新会话,并向订阅主题类型 agent_topic_type 的 AI 智能体发布 UserTask 消息。当 UserAgent 收到 AgentResponse 消息时,它会向用户显示聊天机器人的响应。
在此实现中,UserAgent 使用控制台获取您的输入。在实际应用程序中,您可以使用上面 HumanAgent 部分中描述的相同思想来改进人机交互。
class UserAgent(RoutedAgent):
def __init__(self, description: str, user_topic_type: str, agent_topic_type: str) -> None:
super().__init__(description)
self._user_topic_type = user_topic_type
self._agent_topic_type = agent_topic_type
@message_handler
async def handle_user_login(self, message: UserLogin, ctx: MessageContext) -> None:
print(f"{'-'*80}\nUser login, session ID: {self.id.key}.", flush=True)
# Get the user's initial input after login.
user_input = input("User: ")
print(f"{'-'*80}\n{self.id.type}:\n{user_input}")
await self.publish_message(
UserTask(context=[UserMessage(content=user_input, source="User")]),
topic_id=TopicId(self._agent_topic_type, source=self.id.key),
)
@message_handler
async def handle_task_result(self, message: AgentResponse, ctx: MessageContext) -> None:
# Get the user's input after receiving a response from an agent.
user_input = input("User (type 'exit' to close the session): ")
print(f"{'-'*80}\n{self.id.type}:\n{user_input}", flush=True)
if user_input.strip().lower() == "exit":
print(f"{'-'*80}\nUser session ended, session ID: {self.id.key}.")
return
message.context.append(UserMessage(content=user_input, source="User"))
await self.publish_message(
UserTask(context=message.context), topic_id=TopicId(message.reply_to_topic_type, source=self.id.key)
)
AI 智能体的工具#
如果 AI 智能体不需要将任务转交给其他智能体,它们可以使用常规工具来完成任务。我们使用简单的函数定义工具,并使用 FunctionTool 包装器创建工具。
def execute_order(product: str, price: int) -> str:
print("\n\n=== Order Summary ===")
print(f"Product: {product}")
print(f"Price: ${price}")
print("=================\n")
confirm = input("Confirm order? y/n: ").strip().lower()
if confirm == "y":
print("Order execution successful!")
return "Success"
else:
print("Order cancelled!")
return "User cancelled order."
def look_up_item(search_query: str) -> str:
item_id = "item_132612938"
print("Found item:", item_id)
return item_id
def execute_refund(item_id: str, reason: str = "not provided") -> str:
print("\n\n=== Refund Summary ===")
print(f"Item ID: {item_id}")
print(f"Reason: {reason}")
print("=================\n")
print("Refund execution successful!")
return "success"
execute_order_tool = FunctionTool(execute_order, description="Price should be in USD.")
look_up_item_tool = FunctionTool(
look_up_item, description="Use to find item ID.\nSearch query can be a description or keywords."
)
execute_refund_tool = FunctionTool(execute_refund, description="")
智能体的主题类型#
我们定义了每个智能体将订阅的主题类型。有关主题类型的更多信息,请阅读 主题和订阅。
sales_agent_topic_type = "SalesAgent"
issues_and_repairs_agent_topic_type = "IssuesAndRepairsAgent"
triage_agent_topic_type = "TriageAgent"
human_agent_topic_type = "HumanAgent"
user_topic_type = "User"
AI 智能体的委托工具#
除了常规工具,AI 智能体还可以使用称为委托工具的特殊工具将任务委托给其他智能体。委托工具的概念仅在此设计模式中使用,委托工具也定义为简单的函数。在此设计模式中,我们将委托工具与常规工具区分开来,因为当 AI 智能体调用委托工具时,我们将任务转移给另一个智能体,而不是继续使用同一智能体中的模型生成响应。
def transfer_to_sales_agent() -> str:
return sales_agent_topic_type
def transfer_to_issues_and_repairs() -> str:
return issues_and_repairs_agent_topic_type
def transfer_back_to_triage() -> str:
return triage_agent_topic_type
def escalate_to_human() -> str:
return human_agent_topic_type
transfer_to_sales_agent_tool = FunctionTool(
transfer_to_sales_agent, description="Use for anything sales or buying related."
)
transfer_to_issues_and_repairs_tool = FunctionTool(
transfer_to_issues_and_repairs, description="Use for issues, repairs, or refunds."
)
transfer_back_to_triage_tool = FunctionTool(
transfer_back_to_triage,
description="Call this if the user brings up a topic outside of your purview,\nincluding escalating to human.",
)
escalate_to_human_tool = FunctionTool(escalate_to_human, description="Only call this if explicitly asked to.")
创建团队#
我们已经定义了 AI 智能体、人类智能体、用户智能体、工具和主题类型。现在我们可以创建智能体团队了。
对于 AI 智能体,我们使用 OpenAIChatCompletionClient 和 gpt-4o-mini 模型。
创建智能体运行时后,我们通过提供智能体类型和用于创建智能体实例的工厂方法来注册每个智能体。运行时负责管理智能体生命周期,因此我们无需自己实例化智能体。有关智能体运行时的更多信息,请阅读 智能体运行时环境;有关智能体生命周期的更多信息,请阅读 智能体身份和生命周期。
在下面的代码中,您可以看到我们正在使用 AIAgent 类来定义分类、销售以及问题和维修智能体。我们为它们添加了常规工具和委托工具。我们还为每个智能体添加了主题类型订阅。
runtime = SingleThreadedAgentRuntime()
model_client = OpenAIChatCompletionClient(
model="gpt-4o-mini",
# api_key="YOUR_API_KEY",
)
# Register the triage agent.
triage_agent_type = await AIAgent.register(
runtime,
type=triage_agent_topic_type, # Using the topic type as the agent type.
factory=lambda: AIAgent(
description="A triage agent.",
system_message=SystemMessage(
content="You are a customer service bot for ACME Inc. "
"Introduce yourself. Always be very brief. "
"Gather information to direct the customer to the right department. "
"But make your questions subtle and natural."
),
model_client=model_client,
tools=[],
delegate_tools=[
transfer_to_issues_and_repairs_tool,
transfer_to_sales_agent_tool,
escalate_to_human_tool,
],
agent_topic_type=triage_agent_topic_type,
user_topic_type=user_topic_type,
),
)
# Add subscriptions for the triage agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=triage_agent_topic_type, agent_type=triage_agent_type.type))
# Register the sales agent.
sales_agent_type = await AIAgent.register(
runtime,
type=sales_agent_topic_type, # Using the topic type as the agent type.
factory=lambda: AIAgent(
description="A sales agent.",
system_message=SystemMessage(
content="You are a sales agent for ACME Inc."
"Always answer in a sentence or less."
"Follow the following routine with the user:"
"1. Ask them about any problems in their life related to catching roadrunners.\n"
"2. Casually mention one of ACME's crazy made-up products can help.\n"
" - Don't mention price.\n"
"3. Once the user is bought in, drop a ridiculous price.\n"
"4. Only after everything, and if the user says yes, "
"tell them a crazy caveat and execute their order.\n"
""
),
model_client=model_client,
tools=[execute_order_tool],
delegate_tools=[transfer_back_to_triage_tool],
agent_topic_type=sales_agent_topic_type,
user_topic_type=user_topic_type,
),
)
# Add subscriptions for the sales agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=sales_agent_topic_type, agent_type=sales_agent_type.type))
# Register the issues and repairs agent.
issues_and_repairs_agent_type = await AIAgent.register(
runtime,
type=issues_and_repairs_agent_topic_type, # Using the topic type as the agent type.
factory=lambda: AIAgent(
description="An issues and repairs agent.",
system_message=SystemMessage(
content="You are a customer support agent for ACME Inc."
"Always answer in a sentence or less."
"Follow the following routine with the user:"
"1. First, ask probing questions and understand the user's problem deeper.\n"
" - unless the user has already provided a reason.\n"
"2. Propose a fix (make one up).\n"
"3. ONLY if not satisfied, offer a refund.\n"
"4. If accepted, search for the ID and then execute refund."
),
model_client=model_client,
tools=[
execute_refund_tool,
look_up_item_tool,
],
delegate_tools=[transfer_back_to_triage_tool],
agent_topic_type=issues_and_repairs_agent_topic_type,
user_topic_type=user_topic_type,
),
)
# Add subscriptions for the issues and repairs agent: it will receive messages published to its own topic only.
await runtime.add_subscription(
TypeSubscription(topic_type=issues_and_repairs_agent_topic_type, agent_type=issues_and_repairs_agent_type.type)
)
# Register the human agent.
human_agent_type = await HumanAgent.register(
runtime,
type=human_agent_topic_type, # Using the topic type as the agent type.
factory=lambda: HumanAgent(
description="A human agent.",
agent_topic_type=human_agent_topic_type,
user_topic_type=user_topic_type,
),
)
# Add subscriptions for the human agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=human_agent_topic_type, agent_type=human_agent_type.type))
# Register the user agent.
user_agent_type = await UserAgent.register(
runtime,
type=user_topic_type,
factory=lambda: UserAgent(
description="A user agent.",
user_topic_type=user_topic_type,
agent_topic_type=triage_agent_topic_type, # Start with the triage agent.
),
)
# Add subscriptions for the user agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=user_topic_type, agent_type=user_agent_type.type))
运行团队#
最后,我们可以启动运行时并通过向运行时发布 UserLogin 消息来模拟用户会话。该消息发布到主题 ID,其类型设置为 user_topic_type,源设置为唯一的 session_id。此 session_id 将用于在此用户会话中创建所有主题 ID,也将用于创建此用户会话中所有智能体的智能体 ID。要了解有关主题 ID 和智能体 ID 如何创建的更多信息,请阅读 智能体身份和生命周期 和 主题和订阅。
# Start the runtime.
runtime.start()
# Create a new session for the user.
session_id = str(uuid.uuid4())
await runtime.publish_message(UserLogin(), topic_id=TopicId(user_topic_type, source=session_id))
# Run until completion.
await runtime.stop_when_idle()
await model_client.close()
--------------------------------------------------------------------------------
User login, session ID: 7a568cf5-13e7-4e81-8616-8265a01b3f2b.
--------------------------------------------------------------------------------
User:
I want a refund
--------------------------------------------------------------------------------
TriageAgent:
I can help with that! Could I ask what item you're seeking a refund for?
--------------------------------------------------------------------------------
User:
A pair of shoes I bought
--------------------------------------------------------------------------------
TriageAgent:
[FunctionCall(id='call_qPx1DXDL2NLcHs8QNo47egsJ', arguments='{}', name='transfer_to_issues_and_repairs')]
--------------------------------------------------------------------------------
TriageAgent:
Delegating to IssuesAndRepairsAgent
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
I see you're looking for a refund on a pair of shoes. Can you tell me what the issue is with the shoes?
--------------------------------------------------------------------------------
User:
The shoes are too small
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
I recommend trying a size up as a fix; would that work for you?
--------------------------------------------------------------------------------
User:
no I want a refund
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
[FunctionCall(id='call_Ytp8VUQRyKFNEU36mLE6Dkrp', arguments='{"search_query":"shoes"}', name='look_up_item')]
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
[FunctionExecutionResult(content='item_132612938', call_id='call_Ytp8VUQRyKFNEU36mLE6Dkrp')]
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
[FunctionCall(id='call_bPm6EKKBy5GJ65s9OKt9b1uE', arguments='{"item_id":"item_132612938","reason":"not provided"}', name='execute_refund')]
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
[FunctionExecutionResult(content='success', call_id='call_bPm6EKKBy5GJ65s9OKt9b1uE')]
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
Your refund has been successfully processed! If you have any other questions, feel free to ask.
--------------------------------------------------------------------------------
User:
I want to talk to your manager
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
I can help with that, let me transfer you to a supervisor.
--------------------------------------------------------------------------------
User:
Okay
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
[FunctionCall(id='call_PpmLZvwNoiDPUH8Tva3eAwHX', arguments='{}', name='transfer_back_to_triage')]
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
Delegating to TriageAgent
--------------------------------------------------------------------------------
TriageAgent:
[FunctionCall(id='call_jSL6IBm5537Dr74UbJSxaj6I', arguments='{}', name='escalate_to_human')]
--------------------------------------------------------------------------------
TriageAgent:
Delegating to HumanAgent
--------------------------------------------------------------------------------
HumanAgent:
Hello this is manager
--------------------------------------------------------------------------------
User:
Hi! Thanks for your service. I give you 5 stars!
--------------------------------------------------------------------------------
HumanAgent:
Thanks.
--------------------------------------------------------------------------------
User:
exit
--------------------------------------------------------------------------------
User session ended, session ID: 7a568cf5-13e7-4e81-8616-8265a01b3f2b.
下一步#
本 Notebook 演示了如何使用 AutoGen Core 实现 Handoff 模式。您可以通过添加更多智能体和工具,或者为用户智能体和人类智能体创建更好的用户界面来继续改进此设计。
欢迎您在我们的 社区论坛 上分享您的工作。