主题和订阅示例场景#

简介#

在本 Cookbook 中,我们将探讨 AutoGen 中使用四种不同的广播场景,Agent 如何进行通信。这些场景说明了处理和在 Agent 之间分发消息的各种方法。我们将使用一个一致的税务管理公司处理客户请求的示例来演示每个场景。

场景概述#

想象一家税务管理公司,为客户提供各种服务,例如税务规划、争议解决、合规和准备。该公司拥有一支税务专家团队,每位专家都拥有其中一个领域的专业知识,以及一位监督运营的税务系统经理。

客户提交的请求需要由相应的专家处理。客户、税务系统经理和税务专家之间的沟通通过此系统中的广播来处理。

我们将探讨不同的广播场景如何影响消息在 Agent 之间分发的方式,以及如何使用它们来根据特定需求定制通信流程。


广播场景概述#

我们将介绍以下广播场景

  1. 单租户、单发布范围

  2. 多租户、单发布范围

  3. 单租户、多发布范围

  4. 多租户、多发布范围

每个场景代表系统中消息分发和 Agent 交互的不同方法。通过理解这些场景,您可以设计最适合您应用程序需求的 Agent 通信策略。

import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import List

from autogen_core import (
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TopicId,
    TypeSubscription,
    message_handler,
)
from autogen_core._default_subscription import DefaultSubscription
from autogen_core._default_topic import DefaultTopicId
from autogen_core.models import (
    SystemMessage,
)
class TaxSpecialty(str, Enum):
    PLANNING = "planning"
    DISPUTE_RESOLUTION = "dispute_resolution"
    COMPLIANCE = "compliance"
    PREPARATION = "preparation"


@dataclass
class ClientRequest:
    content: str


@dataclass
class RequestAssessment:
    content: str


class TaxSpecialist(RoutedAgent):
    def __init__(
        self,
        description: str,
        specialty: TaxSpecialty,
        system_messages: List[SystemMessage],
    ) -> None:
        super().__init__(description)
        self.specialty = specialty
        self._system_messages = system_messages
        self._memory: List[ClientRequest] = []

    @message_handler
    async def handle_message(self, message: ClientRequest, ctx: MessageContext) -> None:
        # Process the client request.
        print(f"\n{'='*50}\nTax specialist {self.id} with specialty {self.specialty}:\n{message.content}")
        # Send a response back to the manager
        if ctx.topic_id is None:
            raise ValueError("Topic ID is required for broadcasting")
        await self.publish_message(
            message=RequestAssessment(content=f"I can handle this request in {self.specialty}."),
            topic_id=ctx.topic_id,
        )

1. 单租户、单发布范围#

场景解释#

在单租户、单发布范围的场景中

  • 所有 Agent 在单个租户中运行(例如,一个客户端或用户会话)。

  • 消息发布到单个主题,并且所有 Agent 订阅此主题。

  • 每个 Agent 接收发布到该主题的每条消息。

此场景适用于所有 Agent 都需要了解所有消息的情况,并且无需隔离不同 Agent 组或会话之间的通信。

在税务专家公司中的应用#

在我们的税务专家公司中,此场景意味着

  • 所有税务专家都会收到每个客户的请求和内部消息。

  • 所有 Agent 都密切协作,并且可以完全了解所有通信。

  • 适用于所有 Agent 都需要了解所有消息的任务或团队。

场景的工作方式#

  • 订阅:所有 Agent 都使用默认订阅(例如,“default”)。

  • 发布:消息发布到默认主题。

  • 消息处理:每个 Agent 根据消息的内容和可用的处理程序来决定是否对消息执行操作。

优点#

  • 简单性:易于设置和理解。

  • 协作:促进 Agent 之间的透明度和协作。

  • 灵活性:Agent 可以动态地决定要处理哪些消息。

注意事项#

  • 可伸缩性:可能无法很好地扩展到大量 Agent 或消息。

  • 效率:Agent 可能会收到许多不相关的消息,从而导致不必要的处理。

async def run_single_tenant_single_scope() -> None:
    # Create the runtime.
    runtime = SingleThreadedAgentRuntime()

    # Register TaxSpecialist agents for each specialty
    specialist_agent_type_1 = "TaxSpecialist_1"
    specialist_agent_type_2 = "TaxSpecialist_2"
    await TaxSpecialist.register(
        runtime=runtime,
        type=specialist_agent_type_1,
        factory=lambda: TaxSpecialist(
            description="A tax specialist 1",
            specialty=TaxSpecialty.PLANNING,
            system_messages=[SystemMessage(content="You are a tax specialist.")],
        ),
    )

    await TaxSpecialist.register(
        runtime=runtime,
        type=specialist_agent_type_2,
        factory=lambda: TaxSpecialist(
            description="A tax specialist 2",
            specialty=TaxSpecialty.DISPUTE_RESOLUTION,
            system_messages=[SystemMessage(content="You are a tax specialist.")],
        ),
    )

    # Add default subscriptions for each agent type
    await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_1))
    await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_2))

    # Start the runtime and send a message to agents on default topic
    runtime.start()
    await runtime.publish_message(ClientRequest("I need to have my tax for 2024 prepared."), topic_id=DefaultTopicId())
    await runtime.stop_when_idle()


await run_single_tenant_single_scope()
==================================================
Tax specialist TaxSpecialist_1:default with specialty TaxSpecialty.PLANNING:
I need to have my tax for 2024 prepared.

==================================================
Tax specialist TaxSpecialist_2:default with specialty TaxSpecialty.DISPUTE_RESOLUTION:
I need to have my tax for 2024 prepared.

2. 多租户、单发布范围#

场景解释#

在多租户、单发布范围的场景中

  • 存在多个租户(例如,多个客户端或用户会话)。

  • 每个租户都通过主题源拥有自己的隔离主题。

  • 租户内的所有 Agent 都订阅该租户的主题。如果需要,会为每个租户创建新的 Agent 实例。

  • 消息仅对同一租户内的 Agent 可见。

当您需要隔离不同租户之间的通信,但希望租户内的所有 Agent 都了解所有消息时,此场景很有用。

在税务专家公司中的应用#

在此场景中

  • 该公司同时为多个客户(租户)提供服务。

  • 对于每个客户,都会创建一组专用的 Agent 实例。

  • 每个客户的通信都与其他客户隔离。

  • 客户的所有 Agent 都会收到发布到该客户主题的消息。

场景的工作方式#

  • 订阅:Agent 根据租户的身份订阅主题。

  • 发布:消息发布到特定于租户的主题。

  • 消息处理:Agent 仅接收与其租户相关的消息。

优点#

  • 租户隔离:确保客户之间的数据隐私和隔离。

  • 租户内协作:Agent 可以在其租户内自由协作。

注意事项#

  • 复杂性:需要管理多组 Agent 和主题。

  • 资源使用:更多 Agent 实例可能会消耗更多资源。

async def run_multi_tenant_single_scope() -> None:
    # Create the runtime
    runtime = SingleThreadedAgentRuntime()

    # List of clients (tenants)
    tenants = ["ClientABC", "ClientXYZ"]

    # Initialize sessions and map the topic type to each TaxSpecialist agent type
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        specialist_subscription = DefaultSubscription(agent_type=specialist_agent_type)
        await runtime.add_subscription(specialist_subscription)

    # Start the runtime
    runtime.start()

    # Publish client requests to their respective topics
    for tenant in tenants:
        topic_source = tenant  # The topic source is the client name
        topic_id = DefaultTopicId(source=topic_source)
        await runtime.publish_message(
            ClientRequest(f"{tenant} requires tax services."),
            topic_id=topic_id,
        )

    # Allow time for message processing
    await asyncio.sleep(1)

    # Stop the runtime when idle
    await runtime.stop_when_idle()


await run_multi_tenant_single_scope()
==================================================
Tax specialist TaxSpecialist_planning:ClientABC with specialty TaxSpecialty.PLANNING:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_compliance:ClientABC with specialty TaxSpecialty.COMPLIANCE:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_preparation:ClientABC with specialty TaxSpecialty.PREPARATION:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_planning:ClientXYZ with specialty TaxSpecialty.PLANNING:
ClientXYZ requires tax services.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientXYZ requires tax services.

==================================================
Tax specialist TaxSpecialist_compliance:ClientXYZ with specialty TaxSpecialty.COMPLIANCE:
ClientXYZ requires tax services.

==================================================
Tax specialist TaxSpecialist_preparation:ClientXYZ with specialty TaxSpecialty.PREPARATION:
ClientXYZ requires tax services.

3. 单租户、多发布范围#

场景解释#

在单租户、多发布范围的场景中

  • 所有 Agent 在单个租户中运行。

  • 消息发布到不同的主题。

  • Agent 订阅与其角色或专业相关的特定主题。

  • 消息根据主题定向到 Agent 的子集。

此场景允许租户内的目标通信,从而可以更精细地控制消息分发。

在税务管理公司中的应用#

在此场景中

  • 税务系统经理根据特定专家的专长与其进行沟通。

  • 不同的主题代表不同的专长(例如,“planning”、“compliance”)。

  • 专家仅订阅与其专长相匹配的主题。

  • 经理将消息发布到特定主题,以联系到预期的专家。

场景的工作方式#

  • 订阅:Agent 订阅与其专长相对应的主题。

  • 发布:消息基于预期的接收者发布到主题。

  • 消息处理:只有订阅了主题的 Agent 才会收到其消息。

优点#

  • 目标通信:消息仅发送给相关的 Agent。

  • 效率:减少 Agent 不必要的消息处理。

注意事项#

  • 设置复杂性:需要仔细管理主题和订阅。

  • 灵活性:通信场景的变化可能需要更新订阅。

async def run_single_tenant_multiple_scope() -> None:
    # Create the runtime
    runtime = SingleThreadedAgentRuntime()
    # Register TaxSpecialist agents for each specialty and add subscriptions
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        specialist_subscription = TypeSubscription(topic_type=specialty.value, agent_type=specialist_agent_type)
        await runtime.add_subscription(specialist_subscription)

    # Start the runtime
    runtime.start()

    # Publish a ClientRequest to each specialist's topic
    for specialty in TaxSpecialty:
        topic_id = TopicId(type=specialty.value, source="default")
        await runtime.publish_message(
            ClientRequest(f"I need assistance with {specialty.value} taxes."),
            topic_id=topic_id,
        )

    # Allow time for message processing
    await asyncio.sleep(1)

    # Stop the runtime when idle
    await runtime.stop_when_idle()


await run_single_tenant_multiple_scope()
==================================================
Tax specialist TaxSpecialist_planning:default with specialty TaxSpecialty.PLANNING:
I need assistance with planning taxes.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:default with specialty TaxSpecialty.DISPUTE_RESOLUTION:
I need assistance with dispute_resolution taxes.

==================================================
Tax specialist TaxSpecialist_compliance:default with specialty TaxSpecialty.COMPLIANCE:
I need assistance with compliance taxes.

==================================================
Tax specialist TaxSpecialist_preparation:default with specialty TaxSpecialty.PREPARATION:
I need assistance with preparation taxes.

4. 多租户、多发布范围#

场景解释#

在多租户、多发布范围的场景中

  • 有多个租户,每个租户都有自己的 Agent 集。

  • 消息发布到每个租户内的多个主题。

  • Agent 订阅与其角色相关的特定于租户的主题。

  • 结合了租户隔离和目标通信。

此场景提供对消息分发的最高级别的控制,适用于具有多个客户端和专业通信需求的复杂系统。

在税务管理公司中的应用#

在此场景中

  • 该公司为多个客户提供服务,每个客户都有专用的 Agent 实例。

  • 在每个客户中,Agent 使用多个基于专长的主题进行通信。

  • 例如,Client A 的规划专家订阅带有源“ClientA”的“规划”主题。

  • 每个客户的税务系统经理使用特定于租户的主题与其专家进行通信。

场景的工作方式#

  • 订阅:Agent 根据租户身份和专长订阅主题。

  • 发布:消息发布到特定于租户和特定于专长的主题。

  • 消息处理:只有与租户和主题匹配的 Agent 才会收到消息。

优点#

  • 完全隔离:确保租户和通信隔离。

  • 精细控制:可以精确地将消息路由到预期的 Agent。

注意事项#

  • 复杂性:需要仔细管理主题、租户和订阅。

  • 资源使用:增加的 Agent 实例和主题数量可能会影响资源。

async def run_multi_tenant_multiple_scope() -> None:
    # Create the runtime
    runtime = SingleThreadedAgentRuntime()

    # Define TypeSubscriptions for each specialty and tenant
    tenants = ["ClientABC", "ClientXYZ"]

    # Initialize agents for all specialties and add type subscriptions
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        for tenant in tenants:
            specialist_subscription = TypeSubscription(
                topic_type=f"{tenant}_{specialty.value}", agent_type=specialist_agent_type
            )
            await runtime.add_subscription(specialist_subscription)

    # Start the runtime
    runtime.start()

    # Send messages for each tenant to each specialty
    for tenant in tenants:
        for specialty in TaxSpecialty:
            topic_id = TopicId(type=f"{tenant}_{specialty.value}", source=tenant)
            await runtime.publish_message(
                ClientRequest(f"{tenant} needs assistance with {specialty.value} taxes."),
                topic_id=topic_id,
            )

    # Allow time for message processing
    await asyncio.sleep(1)

    # Stop the runtime when idle
    await runtime.stop_when_idle()


await run_multi_tenant_multiple_scope()
==================================================
Tax specialist TaxSpecialist_planning:ClientABC with specialty TaxSpecialty.PLANNING:
ClientABC needs assistance with planning taxes.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientABC needs assistance with dispute_resolution taxes.

==================================================
Tax specialist TaxSpecialist_compliance:ClientABC with specialty TaxSpecialty.COMPLIANCE:
ClientABC needs assistance with compliance taxes.

==================================================
Tax specialist TaxSpecialist_preparation:ClientABC with specialty TaxSpecialty.PREPARATION:
ClientABC needs assistance with preparation taxes.

==================================================
Tax specialist TaxSpecialist_planning:ClientXYZ with specialty TaxSpecialty.PLANNING:
ClientXYZ needs assistance with planning taxes.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientXYZ needs assistance with dispute_resolution taxes.

==================================================
Tax specialist TaxSpecialist_compliance:ClientXYZ with specialty TaxSpecialty.COMPLIANCE:
ClientXYZ needs assistance with compliance taxes.

==================================================
Tax specialist TaxSpecialist_preparation:ClientXYZ with specialty TaxSpecialty.PREPARATION:
ClientXYZ needs assistance with preparation taxes.