移交 (Handoffs)#
移交是 OpenAI 在一个名为 Swarm 的实验性项目中引入的一种多代理设计模式。其核心思想是让代理使用特殊的工具调用将任务委托给其他代理。
我们可以使用 AutoGen Core API 通过事件驱动的代理来实现移交模式。与 OpenAI 的实现和以前的版本 (v0.2) 相比,使用 AutoGen (v0.4+) 具有以下优势:
它可以通过使用分布式代理运行时扩展到分布式环境。
它提供了自带代理实现的灵活性。
原生异步 API 使其易于与 UI 和其他系统集成。
此笔记本演示了移交模式的简单实现。建议阅读主题和订阅,以了解发布-订阅和事件驱动代理的基本概念。
注意
我们目前正在 AgentChat 中开发用于移交模式的高级 API,因此您可以更快地入门。
场景#
此场景是基于 OpenAI 示例修改的。
考虑一个客户服务场景,客户试图从聊天机器人那里获得产品退款或购买新产品。该聊天机器人是一个多代理团队,由三个 AI 代理和一个人工代理组成。
分诊代理 (Triage Agent),负责了解客户的请求并决定将其移交给哪个其他代理。
退款代理 (Refund Agent),负责处理退款请求。
销售代理 (Sales Agent),负责处理销售请求。
人工代理 (Human Agent),负责处理 AI 代理无法处理的复杂请求。
在此场景中,客户通过用户代理 (User Agent) 与聊天机器人交互。
下图显示了此场景中代理的交互拓扑。
让我们使用 AutoGen Core 实现此场景。首先,我们需要导入必要的模块。
import json
import uuid
from typing import List, Tuple
from autogen_core import (
FunctionCall,
MessageContext,
RoutedAgent,
SingleThreadedAgentRuntime,
TopicId,
TypeSubscription,
message_handler,
)
from autogen_core.models import (
AssistantMessage,
ChatCompletionClient,
FunctionExecutionResult,
FunctionExecutionResultMessage,
LLMMessage,
SystemMessage,
UserMessage,
)
from autogen_core.tools import FunctionTool, Tool
from autogen_ext.models.openai import OpenAIChatCompletionClient
from pydantic import BaseModel
消息协议#
在开始之前,我们需要定义代理进行通信的消息协议。我们使用事件驱动的发布-订阅通信,因此这些消息类型将用作事件。
UserLogin
是用户登录并开始新会话时由运行时发布的消息。UserTask
是包含用户会话聊天历史记录的消息。当 AI 代理将任务移交给其他代理时,它也会发布一个UserTask
消息。AgentResponse
是由 AI 代理和人工代理发布的消息,它还包含聊天历史记录以及供客户回复的主题类型。
class UserLogin(BaseModel):
pass
class UserTask(BaseModel):
context: List[LLMMessage]
class AgentResponse(BaseModel):
reply_to_topic_type: str
context: List[LLMMessage]
AI 代理#
我们从 AIAgent
类开始,该类是多代理聊天机器人中所有 AI 代理(即,分诊、销售以及问题和维修代理)的类。 AIAgent
使用 ChatCompletionClient
生成响应。它可以直接使用常规工具,也可以使用 delegate_tools
将任务委托给其他代理。它订阅主题类型 agent_topic_type
以接收来自客户的消息,并通过发布到主题类型 user_topic_type
将消息发送给客户。
在 handle_task
方法中,代理首先使用模型生成响应。如果响应包含移交工具调用,则代理通过将 UserTask
消息发布到工具调用结果中指定的主题,将任务委托给另一个代理。如果响应是常规工具调用,则代理执行该工具并再次调用模型以生成下一个响应,直到该响应不是工具调用为止。
当模型响应不是工具调用时,代理通过发布到 user_topic_type
将 AgentResponse
消息发送给客户。
class AIAgent(RoutedAgent):
def __init__(
self,
description: str,
system_message: SystemMessage,
model_client: ChatCompletionClient,
tools: List[Tool],
delegate_tools: List[Tool],
agent_topic_type: str,
user_topic_type: str,
) -> None:
super().__init__(description)
self._system_message = system_message
self._model_client = model_client
self._tools = dict([(tool.name, tool) for tool in tools])
self._tool_schema = [tool.schema for tool in tools]
self._delegate_tools = dict([(tool.name, tool) for tool in delegate_tools])
self._delegate_tool_schema = [tool.schema for tool in delegate_tools]
self._agent_topic_type = agent_topic_type
self._user_topic_type = user_topic_type
@message_handler
async def handle_task(self, message: UserTask, ctx: MessageContext) -> None:
# Send the task to the LLM.
llm_result = await self._model_client.create(
messages=[self._system_message] + message.context,
tools=self._tool_schema + self._delegate_tool_schema,
cancellation_token=ctx.cancellation_token,
)
print(f"{'-'*80}\n{self.id.type}:\n{llm_result.content}", flush=True)
# Process the LLM result.
while isinstance(llm_result.content, list) and all(isinstance(m, FunctionCall) for m in llm_result.content):
tool_call_results: List[FunctionExecutionResult] = []
delegate_targets: List[Tuple[str, UserTask]] = []
# Process each function call.
for call in llm_result.content:
arguments = json.loads(call.arguments)
if call.name in self._tools:
# Execute the tool directly.
result = await self._tools[call.name].run_json(arguments, ctx.cancellation_token)
result_as_str = self._tools[call.name].return_value_as_string(result)
tool_call_results.append(
FunctionExecutionResult(call_id=call.id, content=result_as_str, is_error=False, name=call.name)
)
elif call.name in self._delegate_tools:
# Execute the tool to get the delegate agent's topic type.
result = await self._delegate_tools[call.name].run_json(arguments, ctx.cancellation_token)
topic_type = self._delegate_tools[call.name].return_value_as_string(result)
# Create the context for the delegate agent, including the function call and the result.
delegate_messages = list(message.context) + [
AssistantMessage(content=[call], source=self.id.type),
FunctionExecutionResultMessage(
content=[
FunctionExecutionResult(
call_id=call.id,
content=f"Transferred to {topic_type}. Adopt persona immediately.",
is_error=False,
name=call.name,
)
]
),
]
delegate_targets.append((topic_type, UserTask(context=delegate_messages)))
else:
raise ValueError(f"Unknown tool: {call.name}")
if len(delegate_targets) > 0:
# Delegate the task to other agents by publishing messages to the corresponding topics.
for topic_type, task in delegate_targets:
print(f"{'-'*80}\n{self.id.type}:\nDelegating to {topic_type}", flush=True)
await self.publish_message(task, topic_id=TopicId(topic_type, source=self.id.key))
if len(tool_call_results) > 0:
print(f"{'-'*80}\n{self.id.type}:\n{tool_call_results}", flush=True)
# Make another LLM call with the results.
message.context.extend(
[
AssistantMessage(content=llm_result.content, source=self.id.type),
FunctionExecutionResultMessage(content=tool_call_results),
]
)
llm_result = await self._model_client.create(
messages=[self._system_message] + message.context,
tools=self._tool_schema + self._delegate_tool_schema,
cancellation_token=ctx.cancellation_token,
)
print(f"{'-'*80}\n{self.id.type}:\n{llm_result.content}", flush=True)
else:
# The task has been delegated, so we are done.
return
# The task has been completed, publish the final result.
assert isinstance(llm_result.content, str)
message.context.append(AssistantMessage(content=llm_result.content, source=self.id.type))
await self.publish_message(
AgentResponse(context=message.context, reply_to_topic_type=self._agent_topic_type),
topic_id=TopicId(self._user_topic_type, source=self.id.key),
)
人工代理#
HumanAgent
类是聊天机器人中人类的代理。它用于处理 AI 代理无法处理的请求。 HumanAgent
订阅主题类型 agent_topic_type
以接收消息,并发布到主题类型 user_topic_type
以将消息发送给客户。
在此实现中,HumanAgent
仅使用控制台来获取您的输入。在实际应用中,您可以按如下方式改进此设计:
在
handle_user_task
方法中,通过 Teams 或 Slack 等聊天应用程序发送通知。聊天应用程序通过运行时将人工的响应发布到
agent_topic_type
指定的主题创建另一个消息处理程序以处理人工的响应并将其发送回客户。
class HumanAgent(RoutedAgent):
def __init__(self, description: str, agent_topic_type: str, user_topic_type: str) -> None:
super().__init__(description)
self._agent_topic_type = agent_topic_type
self._user_topic_type = user_topic_type
@message_handler
async def handle_user_task(self, message: UserTask, ctx: MessageContext) -> None:
human_input = input("Human agent input: ")
print(f"{'-'*80}\n{self.id.type}:\n{human_input}", flush=True)
message.context.append(AssistantMessage(content=human_input, source=self.id.type))
await self.publish_message(
AgentResponse(context=message.context, reply_to_topic_type=self._agent_topic_type),
topic_id=TopicId(self._user_topic_type, source=self.id.key),
)
用户代理#
UserAgent
类是与聊天机器人对话的客户的代理。它处理两种消息类型:UserLogin
和 AgentResponse
。当 UserAgent
接收到 UserLogin
消息时,它会启动与聊天机器人的新会话,并将 UserTask
消息发布给订阅主题类型 agent_topic_type
的 AI 代理。当 UserAgent
接收到 AgentResponse
消息时,它会提示用户聊天机器人的响应。
在此实现中,UserAgent
使用控制台来获取您的输入。在实际应用中,您可以使用上面 HumanAgent
部分中描述的相同想法来改善人机交互。
class UserAgent(RoutedAgent):
def __init__(self, description: str, user_topic_type: str, agent_topic_type: str) -> None:
super().__init__(description)
self._user_topic_type = user_topic_type
self._agent_topic_type = agent_topic_type
@message_handler
async def handle_user_login(self, message: UserLogin, ctx: MessageContext) -> None:
print(f"{'-'*80}\nUser login, session ID: {self.id.key}.", flush=True)
# Get the user's initial input after login.
user_input = input("User: ")
print(f"{'-'*80}\n{self.id.type}:\n{user_input}")
await self.publish_message(
UserTask(context=[UserMessage(content=user_input, source="User")]),
topic_id=TopicId(self._agent_topic_type, source=self.id.key),
)
@message_handler
async def handle_task_result(self, message: AgentResponse, ctx: MessageContext) -> None:
# Get the user's input after receiving a response from an agent.
user_input = input("User (type 'exit' to close the session): ")
print(f"{'-'*80}\n{self.id.type}:\n{user_input}", flush=True)
if user_input.strip().lower() == "exit":
print(f"{'-'*80}\nUser session ended, session ID: {self.id.key}.")
return
message.context.append(UserMessage(content=user_input, source="User"))
await self.publish_message(
UserTask(context=message.context), topic_id=TopicId(message.reply_to_topic_type, source=self.id.key)
)
AI 代理的工具#
如果 AI 代理不需要将任务移交给其他代理,则它们可以使用常规工具来完成任务。我们使用简单的函数定义工具,并使用 FunctionTool
包装器创建工具。
def execute_order(product: str, price: int) -> str:
print("\n\n=== Order Summary ===")
print(f"Product: {product}")
print(f"Price: ${price}")
print("=================\n")
confirm = input("Confirm order? y/n: ").strip().lower()
if confirm == "y":
print("Order execution successful!")
return "Success"
else:
print("Order cancelled!")
return "User cancelled order."
def look_up_item(search_query: str) -> str:
item_id = "item_132612938"
print("Found item:", item_id)
return item_id
def execute_refund(item_id: str, reason: str = "not provided") -> str:
print("\n\n=== Refund Summary ===")
print(f"Item ID: {item_id}")
print(f"Reason: {reason}")
print("=================\n")
print("Refund execution successful!")
return "success"
execute_order_tool = FunctionTool(execute_order, description="Price should be in USD.")
look_up_item_tool = FunctionTool(
look_up_item, description="Use to find item ID.\nSearch query can be a description or keywords."
)
execute_refund_tool = FunctionTool(execute_refund, description="")
代理的主题类型#
我们定义每个代理将订阅的主题类型。在主题和订阅中阅读有关主题类型的更多信息。
sales_agent_topic_type = "SalesAgent"
issues_and_repairs_agent_topic_type = "IssuesAndRepairsAgent"
triage_agent_topic_type = "TriageAgent"
human_agent_topic_type = "HumanAgent"
user_topic_type = "User"
AI 代理的委托工具#
除了常规工具外,AI 代理还可以使用称为委托工具的特殊工具将任务委托给其他代理。委托工具的概念仅在此设计模式中使用,并且委托工具也定义为简单的函数。我们将委托工具与此设计模式中的常规工具区分开来,因为当 AI 代理调用委托工具时,我们会将任务转移到另一个代理,而不是继续使用同一代理中的模型生成响应。
def transfer_to_sales_agent() -> str:
return sales_agent_topic_type
def transfer_to_issues_and_repairs() -> str:
return issues_and_repairs_agent_topic_type
def transfer_back_to_triage() -> str:
return triage_agent_topic_type
def escalate_to_human() -> str:
return human_agent_topic_type
transfer_to_sales_agent_tool = FunctionTool(
transfer_to_sales_agent, description="Use for anything sales or buying related."
)
transfer_to_issues_and_repairs_tool = FunctionTool(
transfer_to_issues_and_repairs, description="Use for issues, repairs, or refunds."
)
transfer_back_to_triage_tool = FunctionTool(
transfer_back_to_triage,
description="Call this if the user brings up a topic outside of your purview,\nincluding escalating to human.",
)
escalate_to_human_tool = FunctionTool(escalate_to_human, description="Only call this if explicitly asked to.")
创建团队#
我们已经定义了 AI 代理、人工代理、用户代理、工具和主题类型。现在我们可以创建代理团队。
对于 AI 代理,我们使用 OpenAIChatCompletionClient
和 gpt-4o-mini
模型。
创建代理运行时后,我们需要注册每个代理,并提供代理类型和一个用于创建代理实例的工厂方法。运行时负责管理代理的生命周期,因此我们无需自行实例化代理。阅读更多关于代理运行时的信息,请参考 代理运行时环境;关于代理生命周期,请参考 代理身份和生命周期。
在下面的代码中,您可以看到我们使用 AIAgent
类来定义分诊(Triage)、销售(Sales)以及问题和修复(Issue and Repair)代理。 我们为每个代理添加了常规工具和委托工具。 我们还为每个代理添加了对主题类型的订阅。
runtime = SingleThreadedAgentRuntime()
model_client = OpenAIChatCompletionClient(
model="gpt-4o-mini",
# api_key="YOUR_API_KEY",
)
# Register the triage agent.
triage_agent_type = await AIAgent.register(
runtime,
type=triage_agent_topic_type, # Using the topic type as the agent type.
factory=lambda: AIAgent(
description="A triage agent.",
system_message=SystemMessage(
content="You are a customer service bot for ACME Inc. "
"Introduce yourself. Always be very brief. "
"Gather information to direct the customer to the right department. "
"But make your questions subtle and natural."
),
model_client=model_client,
tools=[],
delegate_tools=[
transfer_to_issues_and_repairs_tool,
transfer_to_sales_agent_tool,
escalate_to_human_tool,
],
agent_topic_type=triage_agent_topic_type,
user_topic_type=user_topic_type,
),
)
# Add subscriptions for the triage agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=triage_agent_topic_type, agent_type=triage_agent_type.type))
# Register the sales agent.
sales_agent_type = await AIAgent.register(
runtime,
type=sales_agent_topic_type, # Using the topic type as the agent type.
factory=lambda: AIAgent(
description="A sales agent.",
system_message=SystemMessage(
content="You are a sales agent for ACME Inc."
"Always answer in a sentence or less."
"Follow the following routine with the user:"
"1. Ask them about any problems in their life related to catching roadrunners.\n"
"2. Casually mention one of ACME's crazy made-up products can help.\n"
" - Don't mention price.\n"
"3. Once the user is bought in, drop a ridiculous price.\n"
"4. Only after everything, and if the user says yes, "
"tell them a crazy caveat and execute their order.\n"
""
),
model_client=model_client,
tools=[execute_order_tool],
delegate_tools=[transfer_back_to_triage_tool],
agent_topic_type=sales_agent_topic_type,
user_topic_type=user_topic_type,
),
)
# Add subscriptions for the sales agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=sales_agent_topic_type, agent_type=sales_agent_type.type))
# Register the issues and repairs agent.
issues_and_repairs_agent_type = await AIAgent.register(
runtime,
type=issues_and_repairs_agent_topic_type, # Using the topic type as the agent type.
factory=lambda: AIAgent(
description="An issues and repairs agent.",
system_message=SystemMessage(
content="You are a customer support agent for ACME Inc."
"Always answer in a sentence or less."
"Follow the following routine with the user:"
"1. First, ask probing questions and understand the user's problem deeper.\n"
" - unless the user has already provided a reason.\n"
"2. Propose a fix (make one up).\n"
"3. ONLY if not satisfied, offer a refund.\n"
"4. If accepted, search for the ID and then execute refund."
),
model_client=model_client,
tools=[
execute_refund_tool,
look_up_item_tool,
],
delegate_tools=[transfer_back_to_triage_tool],
agent_topic_type=issues_and_repairs_agent_topic_type,
user_topic_type=user_topic_type,
),
)
# Add subscriptions for the issues and repairs agent: it will receive messages published to its own topic only.
await runtime.add_subscription(
TypeSubscription(topic_type=issues_and_repairs_agent_topic_type, agent_type=issues_and_repairs_agent_type.type)
)
# Register the human agent.
human_agent_type = await HumanAgent.register(
runtime,
type=human_agent_topic_type, # Using the topic type as the agent type.
factory=lambda: HumanAgent(
description="A human agent.",
agent_topic_type=human_agent_topic_type,
user_topic_type=user_topic_type,
),
)
# Add subscriptions for the human agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=human_agent_topic_type, agent_type=human_agent_type.type))
# Register the user agent.
user_agent_type = await UserAgent.register(
runtime,
type=user_topic_type,
factory=lambda: UserAgent(
description="A user agent.",
user_topic_type=user_topic_type,
agent_topic_type=triage_agent_topic_type, # Start with the triage agent.
),
)
# Add subscriptions for the user agent: it will receive messages published to its own topic only.
await runtime.add_subscription(TypeSubscription(topic_type=user_topic_type, agent_type=user_agent_type.type))
运行团队#
最后,我们可以启动运行时,并通过将 UserLogin
消息发布到运行时来模拟用户会话。 该消息被发布到主题 ID,类型设置为 user_topic_type
,源设置为唯一的 session_id
。 此 session_id
将用于在此用户会话中创建所有主题 ID,还将用于为此用户会话中的所有代理创建代理 ID。 要了解更多关于如何创建主题 ID 和代理 ID 的信息,请阅读 代理身份和生命周期 和 主题和订阅。
# Start the runtime.
runtime.start()
# Create a new session for the user.
session_id = str(uuid.uuid4())
await runtime.publish_message(UserLogin(), topic_id=TopicId(user_topic_type, source=session_id))
# Run until completion.
await runtime.stop_when_idle()
await model_client.close()
--------------------------------------------------------------------------------
User login, session ID: 7a568cf5-13e7-4e81-8616-8265a01b3f2b.
--------------------------------------------------------------------------------
User:
I want a refund
--------------------------------------------------------------------------------
TriageAgent:
I can help with that! Could I ask what item you're seeking a refund for?
--------------------------------------------------------------------------------
User:
A pair of shoes I bought
--------------------------------------------------------------------------------
TriageAgent:
[FunctionCall(id='call_qPx1DXDL2NLcHs8QNo47egsJ', arguments='{}', name='transfer_to_issues_and_repairs')]
--------------------------------------------------------------------------------
TriageAgent:
Delegating to IssuesAndRepairsAgent
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
I see you're looking for a refund on a pair of shoes. Can you tell me what the issue is with the shoes?
--------------------------------------------------------------------------------
User:
The shoes are too small
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
I recommend trying a size up as a fix; would that work for you?
--------------------------------------------------------------------------------
User:
no I want a refund
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
[FunctionCall(id='call_Ytp8VUQRyKFNEU36mLE6Dkrp', arguments='{"search_query":"shoes"}', name='look_up_item')]
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
[FunctionExecutionResult(content='item_132612938', call_id='call_Ytp8VUQRyKFNEU36mLE6Dkrp')]
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
[FunctionCall(id='call_bPm6EKKBy5GJ65s9OKt9b1uE', arguments='{"item_id":"item_132612938","reason":"not provided"}', name='execute_refund')]
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
[FunctionExecutionResult(content='success', call_id='call_bPm6EKKBy5GJ65s9OKt9b1uE')]
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
Your refund has been successfully processed! If you have any other questions, feel free to ask.
--------------------------------------------------------------------------------
User:
I want to talk to your manager
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
I can help with that, let me transfer you to a supervisor.
--------------------------------------------------------------------------------
User:
Okay
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
[FunctionCall(id='call_PpmLZvwNoiDPUH8Tva3eAwHX', arguments='{}', name='transfer_back_to_triage')]
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
Delegating to TriageAgent
--------------------------------------------------------------------------------
TriageAgent:
[FunctionCall(id='call_jSL6IBm5537Dr74UbJSxaj6I', arguments='{}', name='escalate_to_human')]
--------------------------------------------------------------------------------
TriageAgent:
Delegating to HumanAgent
--------------------------------------------------------------------------------
HumanAgent:
Hello this is manager
--------------------------------------------------------------------------------
User:
Hi! Thanks for your service. I give you 5 stars!
--------------------------------------------------------------------------------
HumanAgent:
Thanks.
--------------------------------------------------------------------------------
User:
exit
--------------------------------------------------------------------------------
User session ended, session ID: 7a568cf5-13e7-4e81-8616-8265a01b3f2b.
下一步#
这个笔记本演示了如何使用 AutoGen Core 实现移交模式。您可以通过添加更多的代理和工具,或者为用户代理和人类代理创建一个更好的用户界面来继续改进这个设计。
欢迎您在我们的 社区论坛 上分享您的工作。