消息和通信#
AutoGen 核心中的代理可以响应、发送和发布消息,并且消息是代理之间相互通信的唯一方式。
消息#
消息是可序列化的对象,可以使用以下方法定义
Pydantic 的子类
pydantic.BaseModel
,或数据类
例如
from dataclasses import dataclass
@dataclass
class TextMessage:
content: str
source: str
@dataclass
class ImageMessage:
url: str
source: str
注意
消息纯粹是数据,不应包含任何逻辑。
消息处理程序#
当代理收到消息时,运行时将调用代理的消息处理程序 (on_message()
),该处理程序应实现代理的消息处理逻辑。如果代理无法处理此消息,则代理应引发 CantHandleException
。
基类 BaseAgent
不提供消息处理逻辑,除非用于高级用例,否则不建议直接实现 on_message()
方法。
开发人员应该首先实现 RoutedAgent
基类,该基类提供内置的消息路由功能。
按类型路由消息#
RoutedAgent
基类提供了一种机制,用于将消息类型与带有 message_handler()
装饰器的消息处理程序关联,因此开发人员无需实现 on_message()
方法。
例如,以下类型路由代理使用不同的消息处理程序响应 TextMessage
和 ImageMessage
from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler
class MyAgent(RoutedAgent):
@message_handler
async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:
print(f"Hello, {message.source}, you said {message.content}!")
@message_handler
async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:
print(f"Hello, {message.source}, you sent me {message.url}!")
创建代理运行时并注册代理类型(参见 代理和代理运行时)
runtime = SingleThreadedAgentRuntime()
await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My Agent"))
AgentType(type='my_agent')
使用 TextMessage
和 ImageMessage
测试此代理。
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="User"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="User"), agent_id)
await runtime.stop_when_idle()
Hello, User, you said Hello, World!!
Hello, User, you sent me https://example.com/image.jpg!
运行时在传递第一条消息时,会自动创建带有代理 ID AgentId("my_agent", "default")
的 MyAgent
实例。
路由相同类型的消息#
在某些情况下,将相同类型的消息路由到不同的处理程序很有用。例如,来自不同发送代理的消息应区别对待。您可以使用 message_handler()
装饰器的 match
参数。
match
参数将相同消息类型的处理程序与特定消息关联 - 它位于消息类型路由之后。它接受一个可调用对象,该对象将消息和 MessageContext
作为参数,并返回一个布尔值,指示是否应由装饰的处理程序处理该消息。可调用对象按照处理程序的字母顺序进行检查。
这是一个代理的示例,该代理使用 match
参数根据发送者代理路由消息
class RoutedBySenderAgent(RoutedAgent):
@message_handler(match=lambda msg, ctx: msg.source.startswith("user1")) # type: ignore
async def on_user1_message(self, message: TextMessage, ctx: MessageContext) -> None:
print(f"Hello from user 1 handler, {message.source}, you said {message.content}!")
@message_handler(match=lambda msg, ctx: msg.source.startswith("user2")) # type: ignore
async def on_user2_message(self, message: TextMessage, ctx: MessageContext) -> None:
print(f"Hello from user 2 handler, {message.source}, you said {message.content}!")
@message_handler(match=lambda msg, ctx: msg.source.startswith("user2")) # type: ignore
async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:
print(f"Hello, {message.source}, you sent me {message.url}!")
上面的代理使用消息的 source
字段来确定发送者代理。如果可用,您还可以使用 MessageContext
的 sender
字段,使用代理 ID 来确定发送者代理。
让我们使用带有不同 source
值的消息来测试此代理
runtime = SingleThreadedAgentRuntime()
await RoutedBySenderAgent.register(runtime, "my_agent", lambda: RoutedBySenderAgent("Routed by sender agent"))
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="user1-test"), agent_id)
await runtime.send_message(TextMessage(content="Hello, World!", source="user2-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user1-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user2-test"), agent_id)
await runtime.stop_when_idle()
Hello from user 1 handler, user1-test, you said Hello, World!!
Hello from user 2 handler, user2-test, you said Hello, World!!
Hello, user2-test, you sent me https://example.com/image.jpg!
在上面的示例中,第一个 ImageMessage
未被处理,因为消息的 source
字段与处理程序的 match
条件不匹配。
直接消息传递#
AutoGen 核心中有两种类型的通信
直接消息传递:将直接消息发送给另一个代理。
广播:将消息发布到主题。
让我们首先看看直接消息传递。要将直接消息发送给另一个代理,请在消息处理程序中使用 autogen_core.BaseAgent.send_message()
方法,从运行时使用 autogen_core.AgentRuntime.send_message()
方法。等待对这些方法的调用将返回接收代理的消息处理程序的返回值。当接收代理的处理程序返回 None
时,将返回 None
。
注意
如果在发送者等待时调用的代理引发异常,则该异常将传播回发送者。
请求/响应#
直接消息传递可用于请求/响应场景,其中发送者希望收到接收者的响应。接收者可以通过从其消息处理程序返回一个值来响应消息。您可以将此视为代理之间的函数调用。
例如,考虑以下代理
from dataclasses import dataclass
from autogen_core import MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler
@dataclass
class Message:
content: str
class InnerAgent(RoutedAgent):
@message_handler
async def on_my_message(self, message: Message, ctx: MessageContext) -> Message:
return Message(content=f"Hello from inner, {message.content}")
class OuterAgent(RoutedAgent):
def __init__(self, description: str, inner_agent_type: str):
super().__init__(description)
self.inner_agent_id = AgentId(inner_agent_type, self.id.key)
@message_handler
async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
print(f"Received message: {message.content}")
# Send a direct message to the inner agent and receives a response.
response = await self.send_message(Message(f"Hello from outer, {message.content}"), self.inner_agent_id)
print(f"Received inner response: {response.content}")
在收到消息后,OuterAgent
会向 InnerAgent
发送直接消息,并收到一条响应消息。
我们可以通过向 OuterAgent
发送 Message
来测试这些代理。
runtime = SingleThreadedAgentRuntime()
await InnerAgent.register(runtime, "inner_agent", lambda: InnerAgent("InnerAgent"))
await OuterAgent.register(runtime, "outer_agent", lambda: OuterAgent("OuterAgent", "inner_agent"))
runtime.start()
outer_agent_id = AgentId("outer_agent", "default")
await runtime.send_message(Message(content="Hello, World!"), outer_agent_id)
await runtime.stop_when_idle()
Received message: Hello, World!
Received inner response: Hello from inner, Hello from outer, Hello, World!
两个输出都由 OuterAgent
的消息处理程序生成,但是第二个输出基于 InnerAgent
的响应。
一般来说,直接消息传递适用于发送者和接收者紧密耦合的场景 - 它们一起创建,并且发送者链接到接收者的特定实例。例如,代理通过将直接消息发送到 ToolAgent
的实例来执行工具调用,并使用响应来形成动作观察循环。
广播#
广播实际上是具有主题和订阅的发布/订阅模型。阅读 主题和订阅 以了解核心概念。
直接消息传递和广播之间的关键区别在于,广播不能用于请求/响应场景。当代理发布消息时,它是单向的,它无法从任何其他代理收到响应,即使接收代理的处理程序返回一个值。
注意
如果为已发布的消息提供响应,则该响应将被丢弃。
注意
如果代理发布了它订阅的消息类型,它将不会收到它发布的消息。这是为了防止无限循环。
订阅和发布到主题#
基于类型的订阅 将发布到给定主题类型的主题的消息映射到给定代理类型的代理。要使继承自 RoutedAgent
的代理订阅给定主题类型的主题,您可以使用 type_subscription()
类装饰器。
以下示例展示了一个 ReceiverAgent
类,它使用 type_subscription()
装饰器订阅 "default"
主题类型的消息,并打印接收到的消息。
from autogen_core import RoutedAgent, message_handler, type_subscription
@type_subscription(topic_type="default")
class ReceivingAgent(RoutedAgent):
@message_handler
async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
print(f"Received a message: {message.content}")
要从代理的处理程序发布消息,请使用 publish_message()
方法并指定一个 TopicId
。此调用仍然必须被 await 以允许运行时安排将消息传递给所有订阅者,但它将始终返回 None
。如果代理在处理已发布的消息时引发异常,这将记录下来,但不会传播回发布代理。
以下示例展示了一个 BroadcastingAgent
,它在收到消息后将消息发布到主题。
from autogen_core import TopicId
class BroadcastingAgent(RoutedAgent):
@message_handler
async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
await self.publish_message(
Message("Publishing a message from broadcasting agent!"),
topic_id=TopicId(type="default", source=self.id.key),
)
BroadcastingAgent
将消息发布到类型为 "default"
的主题,并将源分配给代理实例的代理键。
订阅会在代理运行时注册,可以作为代理类型注册的一部分,也可以通过单独的 API 方法注册。下面是如何使用 type_subscription()
装饰器为接收代理注册 TypeSubscription
,以及为广播代理在没有装饰器的情况下注册。
from autogen_core import TypeSubscription
runtime = SingleThreadedAgentRuntime()
# Option 1: with type_subscription decorator
# The type_subscription class decorator automatically adds a TypeSubscription to
# the runtime when the agent is registered.
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))
# Option 2: with TypeSubscription
await BroadcastingAgent.register(runtime, "broadcasting_agent", lambda: BroadcastingAgent("Broadcasting Agent"))
await runtime.add_subscription(TypeSubscription(topic_type="default", agent_type="broadcasting_agent"))
# Start the runtime and publish a message.
runtime.start()
await runtime.publish_message(
Message("Hello, World! From the runtime!"), topic_id=TopicId(type="default", source="default")
)
await runtime.stop_when_idle()
Received a message: Hello, World! From the runtime!
Received a message: Publishing a message from broadcasting agent!
如上例所示,您还可以通过运行时的 publish_message()
方法直接发布到主题,而无需创建代理实例。
从输出中,您可以看到接收代理收到了两条消息:一条是通过运行时发布的,另一条是由广播代理发布的。
默认主题和订阅#
在上面的例子中,我们使用 TopicId
和 TypeSubscription
分别指定主题和订阅。对于许多场景,这是一种合适的方式。但是,当只有一个发布范围,即所有代理发布和订阅所有广播的消息时,我们可以使用方便的类 DefaultTopicId
和 default_subscription()
来简化我们的代码。
DefaultTopicId
用于创建将 "default"
用作主题类型的默认值,并将发布代理的键用作主题源的默认值的主题。default_subscription()
用于创建订阅默认主题的类型订阅。我们可以通过使用 DefaultTopicId
和 default_subscription()
来简化 BroadcastingAgent
。
from autogen_core import DefaultTopicId, default_subscription
@default_subscription
class BroadcastingAgentDefaultTopic(RoutedAgent):
@message_handler
async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
# Publish a message to all agents in the same namespace.
await self.publish_message(
Message("Publishing a message from broadcasting agent!"),
topic_id=DefaultTopicId(),
)
当运行时调用 register()
注册代理类型时,它会创建一个 TypeSubscription
,该订阅的主题类型使用 "default"
作为默认值,代理类型使用与在同一上下文中注册的代理类型相同的代理类型。
runtime = SingleThreadedAgentRuntime()
await BroadcastingAgentDefaultTopic.register(
runtime, "broadcasting_agent", lambda: BroadcastingAgentDefaultTopic("Broadcasting Agent")
)
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))
runtime.start()
await runtime.publish_message(Message("Hello, World! From the runtime!"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
Received a message: Hello, World! From the runtime!
Received a message: Publishing a message from broadcasting agent!
注意
如果您的场景允许所有代理发布和订阅所有广播的消息,请使用 DefaultTopicId
和 default_subscription()
来装饰您的代理类。