主题和订阅示例场景#

简介#

在本指南中,我们将通过四个不同的广播场景探讨 AutoGen 中代理通信的广播机制。这些场景展示了处理和分发代理之间消息的各种方式。我们将使用一个税务管理公司处理客户请求的持续示例来演示每个场景。

场景概述#

想象一家税务管理公司,为客户提供各种服务,例如税务规划、争议解决、合规和报税。该公司雇佣了一支税务专家团队,每位专家都精通其中一个领域,以及一名负责监督运营的税务系统经理。

客户提交的请求需要由适当的专家进行处理。客户、税务系统经理和税务专家之间的通信通过此系统中的广播来处理。

我们将探讨不同的广播场景如何影响消息在代理之间的分发方式,以及如何根据特定需求调整通信流。


广播场景概述#

我们将涵盖以下广播场景:

  1. 单租户,单一发布范围

  2. 多租户,单一发布范围

  3. 单租户,多发布范围

  4. 多租户,多发布范围

每个场景都代表了系统中消息分发和代理交互的不同方法。通过理解这些场景,您可以设计最适合您应用程序需求的代理通信策略。

import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import List

from autogen_core import (
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TopicId,
    TypeSubscription,
    message_handler,
)
from autogen_core._default_subscription import DefaultSubscription
from autogen_core._default_topic import DefaultTopicId
from autogen_core.models import (
    SystemMessage,
)
class TaxSpecialty(str, Enum):
    PLANNING = "planning"
    DISPUTE_RESOLUTION = "dispute_resolution"
    COMPLIANCE = "compliance"
    PREPARATION = "preparation"


@dataclass
class ClientRequest:
    content: str


@dataclass
class RequestAssessment:
    content: str


class TaxSpecialist(RoutedAgent):
    def __init__(
        self,
        description: str,
        specialty: TaxSpecialty,
        system_messages: List[SystemMessage],
    ) -> None:
        super().__init__(description)
        self.specialty = specialty
        self._system_messages = system_messages
        self._memory: List[ClientRequest] = []

    @message_handler
    async def handle_message(self, message: ClientRequest, ctx: MessageContext) -> None:
        # Process the client request.
        print(f"\n{'='*50}\nTax specialist {self.id} with specialty {self.specialty}:\n{message.content}")
        # Send a response back to the manager
        if ctx.topic_id is None:
            raise ValueError("Topic ID is required for broadcasting")
        await self.publish_message(
            message=RequestAssessment(content=f"I can handle this request in {self.specialty}."),
            topic_id=ctx.topic_id,
        )

1. 单租户,单一发布范围#

场景解释#

在单租户、单一发布范围场景中:

  • 所有代理都在单个租户(例如,一个客户端或用户会话)内运行。

  • 消息发布到一个单一的主题,所有代理都订阅此主题。

  • 每个代理都会收到发布到该主题的每条消息。

此场景适用于所有代理都需要了解所有消息的情况,并且无需隔离不同代理组或会话之间的通信。

在税务专家公司的应用#

在我们的税务专家公司中,此场景意味着:

  • 所有税务专家都会收到每个客户请求和内部消息。

  • 所有代理密切协作,对所有通信都具有完全的可见性。

  • 适用于所有代理都需要了解所有消息的任务或团队。

场景如何运作#

  • 订阅:所有代理使用默认订阅(例如,“default”)。

  • 发布:消息发布到默认主题。

  • 消息处理:每个代理根据其内容和可用处理程序决定是否对消息进行操作。

优点#

  • 简单性:易于设置和理解。

  • 协作:促进代理之间的透明度和协作。

  • 灵活性:代理可以动态决定处理哪些消息。

注意事项#

  • 可扩展性:对于大量代理或消息,可能无法很好地扩展。

  • 效率:代理可能会收到许多不相关的消息,导致不必要的处理。

async def run_single_tenant_single_scope() -> None:
    # Create the runtime.
    runtime = SingleThreadedAgentRuntime()

    # Register TaxSpecialist agents for each specialty
    specialist_agent_type_1 = "TaxSpecialist_1"
    specialist_agent_type_2 = "TaxSpecialist_2"
    await TaxSpecialist.register(
        runtime=runtime,
        type=specialist_agent_type_1,
        factory=lambda: TaxSpecialist(
            description="A tax specialist 1",
            specialty=TaxSpecialty.PLANNING,
            system_messages=[SystemMessage(content="You are a tax specialist.")],
        ),
    )

    await TaxSpecialist.register(
        runtime=runtime,
        type=specialist_agent_type_2,
        factory=lambda: TaxSpecialist(
            description="A tax specialist 2",
            specialty=TaxSpecialty.DISPUTE_RESOLUTION,
            system_messages=[SystemMessage(content="You are a tax specialist.")],
        ),
    )

    # Add default subscriptions for each agent type
    await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_1))
    await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_2))

    # Start the runtime and send a message to agents on default topic
    runtime.start()
    await runtime.publish_message(ClientRequest("I need to have my tax for 2024 prepared."), topic_id=DefaultTopicId())
    await runtime.stop_when_idle()


await run_single_tenant_single_scope()
==================================================
Tax specialist TaxSpecialist_1:default with specialty TaxSpecialty.PLANNING:
I need to have my tax for 2024 prepared.

==================================================
Tax specialist TaxSpecialist_2:default with specialty TaxSpecialty.DISPUTE_RESOLUTION:
I need to have my tax for 2024 prepared.

2. 多租户,单一发布范围#

场景解释#

在多租户、单一发布范围场景中:

  • 存在多个租户(例如,多个客户端或用户会话)。

  • 每个租户通过主题源拥有自己的隔离主题。

  • 租户内的所有代理都订阅该租户的主题。如果需要,会为每个租户创建新的代理实例。

  • 消息仅对同一租户内的代理可见。

当您需要隔离不同租户之间的通信,但希望租户内的所有代理都了解所有消息时,此场景非常有用。

在税务专家公司的应用#

在此场景中:

  • 公司同时服务多个客户(租户)。

  • 为每个客户创建一套专用的代理实例。

  • 每个客户的通信都与其他客户隔离。

  • 某个客户的所有代理都会收到发布到该客户主题的消息。

场景如何运作#

  • 订阅:代理根据租户的身份订阅主题。

  • 发布:消息发布到租户特定的主题。

  • 消息处理:代理只接收与其租户相关的消息。

优点#

  • 租户隔离:确保客户之间的数据隐私和分离。

  • 租户内协作:代理可以在其租户内自由协作。

注意事项#

  • 复杂性:需要管理多套代理和主题。

  • 资源使用:更多的代理实例可能会消耗额外的资源。

async def run_multi_tenant_single_scope() -> None:
    # Create the runtime
    runtime = SingleThreadedAgentRuntime()

    # List of clients (tenants)
    tenants = ["ClientABC", "ClientXYZ"]

    # Initialize sessions and map the topic type to each TaxSpecialist agent type
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        specialist_subscription = DefaultSubscription(agent_type=specialist_agent_type)
        await runtime.add_subscription(specialist_subscription)

    # Start the runtime
    runtime.start()

    # Publish client requests to their respective topics
    for tenant in tenants:
        topic_source = tenant  # The topic source is the client name
        topic_id = DefaultTopicId(source=topic_source)
        await runtime.publish_message(
            ClientRequest(f"{tenant} requires tax services."),
            topic_id=topic_id,
        )

    # Allow time for message processing
    await asyncio.sleep(1)

    # Stop the runtime when idle
    await runtime.stop_when_idle()


await run_multi_tenant_single_scope()
==================================================
Tax specialist TaxSpecialist_planning:ClientABC with specialty TaxSpecialty.PLANNING:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_compliance:ClientABC with specialty TaxSpecialty.COMPLIANCE:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_preparation:ClientABC with specialty TaxSpecialty.PREPARATION:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_planning:ClientXYZ with specialty TaxSpecialty.PLANNING:
ClientXYZ requires tax services.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientXYZ requires tax services.

==================================================
Tax specialist TaxSpecialist_compliance:ClientXYZ with specialty TaxSpecialty.COMPLIANCE:
ClientXYZ requires tax services.

==================================================
Tax specialist TaxSpecialist_preparation:ClientXYZ with specialty TaxSpecialty.PREPARATION:
ClientXYZ requires tax services.

3. 单租户,多发布范围#

场景解释#

在单租户、多发布范围场景中:

  • 所有代理都在单个租户内运行。

  • 消息发布到不同的主题。

  • 代理订阅与其角色或专业相关的特定主题。

  • 消息根据主题定向到代理的子集。

此场景允许租户内的定向通信,从而实现对消息分发更精细的控制。

在税务管理公司的应用#

在此场景中:

  • 税务系统经理根据专家的专业与特定专家进行沟通。

  • 不同的主题代表不同的专业(例如,“规划”、“合规”)。

  • 专家只订阅与其专业匹配的主题。

  • 经理将消息发布到特定主题以触达预期的专家。

场景如何运作#

  • 订阅:代理订阅与其专业相对应的主题。

  • 发布:消息根据预期接收者发布到主题。

  • 消息处理:只有订阅了某个主题的代理才会收到其消息。

优点#

  • 定向通信:消息只发送给相关的代理。

  • 效率:减少代理不必要的消息处理。

注意事项#

  • 设置复杂性:需要仔细管理主题和订阅。

  • 灵活性:通信场景的变化可能需要更新订阅。

async def run_single_tenant_multiple_scope() -> None:
    # Create the runtime
    runtime = SingleThreadedAgentRuntime()
    # Register TaxSpecialist agents for each specialty and add subscriptions
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        specialist_subscription = TypeSubscription(topic_type=specialty.value, agent_type=specialist_agent_type)
        await runtime.add_subscription(specialist_subscription)

    # Start the runtime
    runtime.start()

    # Publish a ClientRequest to each specialist's topic
    for specialty in TaxSpecialty:
        topic_id = TopicId(type=specialty.value, source="default")
        await runtime.publish_message(
            ClientRequest(f"I need assistance with {specialty.value} taxes."),
            topic_id=topic_id,
        )

    # Allow time for message processing
    await asyncio.sleep(1)

    # Stop the runtime when idle
    await runtime.stop_when_idle()


await run_single_tenant_multiple_scope()
==================================================
Tax specialist TaxSpecialist_planning:default with specialty TaxSpecialty.PLANNING:
I need assistance with planning taxes.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:default with specialty TaxSpecialty.DISPUTE_RESOLUTION:
I need assistance with dispute_resolution taxes.

==================================================
Tax specialist TaxSpecialist_compliance:default with specialty TaxSpecialty.COMPLIANCE:
I need assistance with compliance taxes.

==================================================
Tax specialist TaxSpecialist_preparation:default with specialty TaxSpecialty.PREPARATION:
I need assistance with preparation taxes.

4. 多租户,多发布范围#

场景解释#

在多租户、多发布范围场景中:

  • 存在多个租户,每个租户都有自己的一组代理。

  • 消息在每个租户内发布到多个主题。

  • 代理订阅与其角色相关的租户特定主题。

  • 结合了租户隔离和定向通信。

此场景提供了最高级别的消息分发控制,适用于具有多个客户端和专业通信需求的复杂系统。

在税务管理公司的应用#

在此场景中:

  • 公司服务多个客户,每个客户都有专用的代理实例。

  • 在每个客户内部,代理根据专业使用多个主题进行通信。

  • 例如,客户 A 的规划专家订阅源为“ClientA”的“planning”主题。

  • 每个客户的税务系统经理使用租户特定主题与他们的专家进行通信。

场景如何运作#

  • 订阅:代理根据租户身份和专业订阅主题。

  • 发布:消息发布到租户特定和专业特定主题。

  • 消息处理:只有匹配租户和主题的代理才会收到消息。

优点#

  • 完全隔离:确保租户和通信隔离。

  • 精细控制:实现消息精确路由到目标代理。

注意事项#

  • 复杂性:需要仔细管理主题、租户和订阅。

  • 资源使用:增加的代理实例和主题数量可能会影响资源。

async def run_multi_tenant_multiple_scope() -> None:
    # Create the runtime
    runtime = SingleThreadedAgentRuntime()

    # Define TypeSubscriptions for each specialty and tenant
    tenants = ["ClientABC", "ClientXYZ"]

    # Initialize agents for all specialties and add type subscriptions
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        for tenant in tenants:
            specialist_subscription = TypeSubscription(
                topic_type=f"{tenant}_{specialty.value}", agent_type=specialist_agent_type
            )
            await runtime.add_subscription(specialist_subscription)

    # Start the runtime
    runtime.start()

    # Send messages for each tenant to each specialty
    for tenant in tenants:
        for specialty in TaxSpecialty:
            topic_id = TopicId(type=f"{tenant}_{specialty.value}", source=tenant)
            await runtime.publish_message(
                ClientRequest(f"{tenant} needs assistance with {specialty.value} taxes."),
                topic_id=topic_id,
            )

    # Allow time for message processing
    await asyncio.sleep(1)

    # Stop the runtime when idle
    await runtime.stop_when_idle()


await run_multi_tenant_multiple_scope()
==================================================
Tax specialist TaxSpecialist_planning:ClientABC with specialty TaxSpecialty.PLANNING:
ClientABC needs assistance with planning taxes.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientABC needs assistance with dispute_resolution taxes.

==================================================
Tax specialist TaxSpecialist_compliance:ClientABC with specialty TaxSpecialty.COMPLIANCE:
ClientABC needs assistance with compliance taxes.

==================================================
Tax specialist TaxSpecialist_preparation:ClientABC with specialty TaxSpecialty.PREPARATION:
ClientABC needs assistance with preparation taxes.

==================================================
Tax specialist TaxSpecialist_planning:ClientXYZ with specialty TaxSpecialty.PLANNING:
ClientXYZ needs assistance with planning taxes.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientXYZ needs assistance with dispute_resolution taxes.

==================================================
Tax specialist TaxSpecialist_compliance:ClientXYZ with specialty TaxSpecialty.COMPLIANCE:
ClientXYZ needs assistance with compliance taxes.

==================================================
Tax specialist TaxSpecialist_preparation:ClientXYZ with specialty TaxSpecialty.PREPARATION:
ClientXYZ needs assistance with preparation taxes.