autogen_agentchat.teams#

此模块提供了各种预定义多智能体团队的实现。每个团队都继承自 BaseGroupChat 类。

class BaseGroupChat(name: str, description: str, participants: List[ChatAgent | Team], group_chat_manager_name: str, group_chat_manager_class: type[SequentialRoutedAgent], termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False)[source]#

基类:TeamABCComponentBase[BaseModel]

群聊团队的基类。

在群聊团队中,参与者通过向所有其他参与者发布消息来共享上下文。

如果 ChatAgent 是参与者,则来自代理响应的 BaseChatMessagechat_message 将发布到群聊中的其他参与者。

如果 Team 是参与者,则来自团队结果的 BaseChatMessagemessages 将发布到群聊中的其他参与者。

要实现群聊团队,首先创建 BaseGroupChatManager 的子类,然后创建使用该群聊管理器的 BaseGroupChat 的子类。

此基类提供 AgentChat API 的代理与核心 API 的代理运行时之间的映射,并处理运行、暂停、恢复和重置团队等高级功能。

component_type: ClassVar[ComponentType] = 'team'#

组件的逻辑类型。

property name: str#

群聊团队的名称。

property description: str#

群聊团队的描述。

async run(*, task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None, cancellation_token: CancellationToken | None = None, output_task_messages: bool = True) TaskResult[source]#

运行团队并返回结果。基本实现使用 run_stream() 运行团队,然后返回最终结果。团队停止后,终止条件会重置。

参数:
  • task (str | BaseChatMessage | Sequence[BaseChatMessage] | None) – 用于运行团队的任务。可以是字符串、单个 BaseChatMessage,或 BaseChatMessage 列表。

  • cancellation_token (CancellationToken | None) – 用于立即终止任务的取消令牌。设置取消令牌可能会使团队处于不一致状态,并且可能不会重置终止条件。要优雅地停止团队,请改用 ExternalTermination

返回:

result – 任务结果为 TaskResult。结果包含团队生成的消息和停止原因。

使用 RoundRobinGroupChat 团队的示例

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(3)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)

    result = await team.run(task="Count from 1 to 10, respond one at a time.")
    print(result)

    # Run the team again without a task to continue the previous task.
    result = await team.run()
    print(result)


asyncio.run(main())

使用 CancellationToken 取消任务的示例

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(3)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)

    cancellation_token = CancellationToken()

    # Create a task to run the team in the background.
    run_task = asyncio.create_task(
        team.run(
            task="Count from 1 to 10, respond one at a time.",
            cancellation_token=cancellation_token,
        )
    )

    # Wait for 1 second and then cancel the task.
    await asyncio.sleep(1)
    cancellation_token.cancel()

    # This will raise a cancellation error.
    await run_task


asyncio.run(main())
async run_stream(*, task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None, cancellation_token: CancellationToken | None = None, output_task_messages: bool = True) AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None][source]#

运行团队并生成消息流,以及作为流中最后一个项目的 TaskResult 类型的最终结果。团队停止后,终止条件会重置。

注意

如果代理生成 ModelClientStreamingChunkEvent,消息将在流中生成,但不会包含在 messages 中。

参数:
  • task (str | BaseChatMessage | Sequence[BaseChatMessage] | None) – 用于运行团队的任务。可以是字符串、单个 BaseChatMessage,或 BaseChatMessage 列表。

  • cancellation_token (CancellationToken | None) – 用于立即终止任务的取消令牌。设置取消令牌可能会使团队处于不一致状态,并且可能不会重置终止条件。要优雅地停止团队,请改用 ExternalTermination

  • output_task_messages (bool) – 是否在输出流中包含任务消息。为向后兼容,默认为 True。

返回:

stream – 一个 AsyncGenerator,它生成 BaseAgentEventBaseChatMessage,以及作为流中最后一个项目的最终结果 TaskResult

使用 RoundRobinGroupChat 团队的示例

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(3)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)

    stream = team.run_stream(task="Count from 1 to 10, respond one at a time.")
    async for message in stream:
        print(message)

    # Run the team again without a task to continue the previous task.
    stream = team.run_stream()
    async for message in stream:
        print(message)


asyncio.run(main())

使用 CancellationToken 取消任务的示例

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.ui import Console
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(3)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)

    cancellation_token = CancellationToken()

    # Create a task to run the team in the background.
    run_task = asyncio.create_task(
        Console(
            team.run_stream(
                task="Count from 1 to 10, respond one at a time.",
                cancellation_token=cancellation_token,
            )
        )
    )

    # Wait for 1 second and then cancel the task.
    await asyncio.sleep(1)
    cancellation_token.cancel()

    # This will raise a cancellation error.
    await run_task


asyncio.run(main())
async reset() None[source]#

将团队及其参与者重置为初始状态。

团队必须在重置之前停止。

抛出:

RuntimeError – 如果团队尚未初始化或当前正在运行。

使用 RoundRobinGroupChat 团队的示例

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(3)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
    stream = team.run_stream(task="Count from 1 to 10, respond one at a time.")
    async for message in stream:
        print(message)

    # Reset the team.
    await team.reset()
    stream = team.run_stream(task="Count from 1 to 10, respond one at a time.")
    async for message in stream:
        print(message)


asyncio.run(main())
async pause() None#

当团队运行时,通过直接 RPC 调用其 on_pause() 方法来暂停其参与者。

注意

这是 v0.4.9 中引入的实验性功能,未来可能会更改或删除。

团队必须在暂停之前初始化。

与终止不同,暂停团队不会导致 run()run_stream() 方法返回。它会调用每个参与者的 on_pause() 方法,如果参与者未实现该方法,则会无操作。

注意

代理类负责处理暂停并确保代理稍后可以恢复。请务必在您的代理类中实现 on_pause() 方法以实现自定义暂停行为。默认情况下,代理在调用时不会执行任何操作。

抛出:

RuntimeError – 如果团队尚未初始化。调用参与者实现的 on_pause 时来自参与者的异常会传播到此方法并引发。

async resume() None[source]#

当团队正在运行并暂停时,通过直接 RPC 调用其 on_resume() 方法来恢复其参与者。

注意

这是 v0.4.9 中引入的实验性功能,未来可能会更改或删除。

团队必须在恢复之前初始化。

与终止和使用新任务重新启动不同,恢复团队不会导致 run()run_stream() 方法返回。它会调用每个参与者的 on_resume() 方法,如果参与者未实现该方法,则会无操作。

注意

代理类负责处理恢复并确保代理从暂停处继续。请务必在您的代理类中实现 on_resume() 方法以实现自定义恢复行为。

抛出:

RuntimeError – 如果团队尚未初始化。调用参与者实现的 on_resume 方法时来自参与者的异常会传播到此方法并引发。

async save_state() Mapping[str, Any][source]#

保存群聊团队的状态。

通过调用每个参与者和群聊管理器的 agent_save_state() 方法及其内部代理 ID 来保存状态。状态以嵌套字典形式返回:一个以键 agent_states 为键的字典,该字典又是一个以代理名称为键、以状态为值的字典。

{
    "agent_states": {
        "agent1": ...,
        "agent2": ...,
        "RoundRobinGroupChatManager": ...
    }
}

注意

从 v0.4.9 开始,状态使用代理名称作为键而不是代理 ID,并且状态中移除了 team_id 字段。这是为了使状态能够在不同的团队和运行时之间移植。使用旧格式保存的状态将来可能与新格式不兼容。

注意

当在团队运行时调用 save_state() 时,状态可能不一致并可能导致意外状态。建议在团队未运行或停止后调用此方法。

async load_state(state: Mapping[str, Any]) None[source]#

加载外部状态并覆盖群聊团队的当前状态。

通过调用每个参与者和群聊管理器的 agent_load_state() 方法及其内部代理 ID 来加载状态。有关状态的预期格式,请参阅 save_state()

class RoundRobinGroupChat(participants: List[ChatAgent | Team], *, name: str | None = None, description: str | None = None, termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False)[source]#

基类:BaseGroupChatComponent[RoundRobinGroupChatConfig]

一个团队,以轮询方式轮流向所有人发布消息来运行群聊。

如果 ChatAgent 是参与者,则来自代理响应的 BaseChatMessagechat_message 将发布到群聊中的其他参与者。

如果 Team 是参与者,则来自团队结果的 BaseChatMessagemessages 将发布到群聊中的其他参与者。

如果团队中只有一个参与者,则该参与者将是唯一的发言者。

参数:
  • participants (List[ChatAgent | Team]) – 群聊中的参与者。

  • name (str | None, optional) – 群聊的名称,如果未提供,则使用 DEFAULT_NAME。父团队使用该名称来识别此群聊,因此它必须在父团队中是唯一的。

  • description (str | None, optional) – 群聊的描述,如果未提供,则使用 DEFAULT_DESCRIPTION

  • termination_condition (TerminationCondition, optional) – 群聊的终止条件。默认为 None。如果没有终止条件,群聊将无限期运行。

  • max_turns (int, optional) – 群聊在停止前的最大轮次。默认为 None,表示无限制。

  • custom_message_types (List[type[BaseAgentEvent | BaseChatMessage]], optional) – 将在群聊中使用的自定义消息类型列表。如果您正在使用自定义消息类型或您的代理生成自定义消息类型,则需要在此处指定它们。确保您的自定义消息类型是 BaseAgentEventBaseChatMessage 的子类。

  • emit_team_events (bool, optional) – 是否通过 BaseGroupChat.run_stream() 发出团队事件。默认为 False。

抛出:

ValueError – 如果未提供参与者或参与者名称不唯一。

示例

一个拥有一个参与者和工具的团队

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    async def get_weather(location: str) -> str:
        return f"The weather in {location} is sunny."

    assistant = AssistantAgent(
        "Assistant",
        model_client=model_client,
        tools=[get_weather],
    )
    termination = TextMentionTermination("TERMINATE")
    team = RoundRobinGroupChat([assistant], termination_condition=termination)
    await Console(team.run_stream(task="What's the weather in New York?"))


asyncio.run(main())

一个拥有多个参与者的团队

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = TextMentionTermination("TERMINATE")
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
    await Console(team.run_stream(task="Tell me some jokes."))


asyncio.run(main())

由用户代理和嵌套的作者和审阅者代理团队组成的团队

import asyncio

from autogen_agentchat.agents import UserProxyAgent, AssistantAgent
from autogen_agentchat.conditions import TextMentionTermination, MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano")

    writer = AssistantAgent(
        "writer", model_client=model_client, system_message="You are a writer.", model_client_stream=True
    )

    reviewer = AssistantAgent(
        "reviewer",
        model_client=model_client,
        system_message="Provide feedback to the input and suggest improvements.",
        model_client_stream=True,
    )

    # NOTE: you can skip input by pressing Enter.
    user_proxy = UserProxyAgent("user_proxy")

    # Maximum 1 round of review and revision.
    inner_termination = MaxMessageTermination(max_messages=4)

    # The outter-loop termination condition that will terminate the team when the user types "exit".
    outter_termination = TextMentionTermination("exit", sources=["user_proxy"])

    team = RoundRobinGroupChat(
        [
            # For each turn, the writer writes a summary and the reviewer reviews it.
            RoundRobinGroupChat([writer, reviewer], termination_condition=inner_termination),
            # The user proxy gets user input once the writer and reviewer have finished their actions.
            user_proxy,
        ],
        termination_condition=outter_termination,
    )
    # Start the team and wait for it to terminate.
    await Console(team.run_stream(task="Write a short essay about the impact of AI on society."))


asyncio.run(main())
component_config_schema#

别名 RoundRobinGroupChatConfig

component_provider_override: ClassVar[str | None] = 'autogen_agentchat.teams.RoundRobinGroupChat'#

覆盖组件的提供者字符串。这应该用于防止内部模块名称成为模块名称的一部分。

DEFAULT_NAME = 'RoundRobinGroupChat'#
DEFAULT_DESCRIPTION = 'A team of agents.'#
_to_config() RoundRobinGroupChatConfig[source]#

转储创建与此实例配置匹配的组件新实例所需的配置。

返回:

T – 组件的配置。

classmethod _from_config(config: RoundRobinGroupChatConfig) Self[source]#

从配置对象创建组件的新实例。

参数:

config (T) – 配置对象。

返回:

Self – 组件的新实例。

class SelectorGroupChat(participants: List[ChatAgent | Team], model_client: ChatCompletionClient, *, name: str | None = None, description: str | None = None, termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, selector_prompt: str = 'You are in a role play game. The following roles are available:\n{roles}.\nRead the following conversation. Then select the next role from {participants} to play. Only return the role.\n\n{history}\n\nRead the above conversation. Then select the next role from {participants} to play. Only return the role.\n', allow_repeated_speaker: bool = False, max_selector_attempts: int = 3, selector_func: Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], str | None] | Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[str | None]] | None = None, candidate_func: Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], List[str]] | Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[List[str]]] | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False, model_client_streaming: bool = False, model_context: ChatCompletionContext | None = None)[source]#

基类:BaseGroupChatComponent[SelectorGroupChatConfig]

一个群聊团队,参与者轮流向所有人发布消息,使用 ChatCompletion 模型选择下一个发言者。

如果 ChatAgent 是参与者,则来自代理响应的 BaseChatMessagechat_message 将发布到群聊中的其他参与者。

如果 Team 是参与者,则来自团队结果的 BaseChatMessagemessages 将发布到群聊中的其他参与者。

参数:
  • participants (List[ChatAgent | Team]) – 群聊中的参与者,必须具有唯一的名称且至少有两个参与者。

  • model_client (ChatCompletionClient) – 用于选择下一个发言者的 ChatCompletion 模型客户端。

  • name (str | None, optional) – 群聊的名称,如果未提供,则使用 DEFAULT_NAME。父团队使用该名称来识别此群聊,因此它必须在父团队中是唯一的。

  • description (str | None, optional) – 群聊的描述,如果未提供,则使用 DEFAULT_DESCRIPTION

  • termination_condition (TerminationCondition, optional) – 群聊的终止条件。默认为 None。如果没有终止条件,群聊将无限期运行。

  • max_turns (int, optional) – 群聊在停止前的最大轮次。默认为 None,表示无限制。

  • selector_prompt (str, optional) – 用于选择下一个发言者的提示模板。可用字段:‘{roles}’、‘{participants}’ 和 ‘{history}’。{participants} 是候选发言者的姓名。格式为 [“”, “”, …]{roles} 是由名称和候选代理描述组成的换行符分隔列表。每行的格式为: : {history} 是会话历史记录,格式为双换行符分隔的名称和消息内容。每条消息的格式为: :

  • allow_repeated_speaker (bool, optional) – 是否将上一个发言者包含在下一轮选择的候选者列表中。默认为 False。模型仍可能选择上一个发言者 – 如果发生这种情况,将记录警告。

  • max_selector_attempts (int, optional) – 使用模型选择发言者的最大尝试次数。默认为 3。如果模型在最大尝试次数后仍未能选择发言者,则将使用上一个发言者(如果可用),否则将使用第一个参与者。

  • selector_func (Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], str | None], Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[str | None]], optional) – 一个自定义选择器函数,它接受会话历史记录并返回下一个发言者的姓名。如果提供,此函数将用于覆盖模型以选择下一个发言者。如果函数返回 None,则将使用模型选择下一个发言者。注意:selector_func 不可序列化,在序列化和反序列化过程中将被忽略。

  • candidate_func (Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], List[str]], Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[List[str]]], optional) – 一个自定义函数,它接受会话历史记录并返回一个过滤后的候选列表,用于使用模型选择下一个发言者。如果函数返回空列表或 NoneSelectorGroupChat 将引发 ValueError。此函数仅在未设置 selector_func 时使用。allow_repeated_speaker 将被忽略。

  • custom_message_types (List[type[BaseAgentEvent | BaseChatMessage]], optional) – 将在群聊中使用的自定义消息类型列表。如果您正在使用自定义消息类型或您的代理生成自定义消息类型,则需要在此处指定它们。确保您的自定义消息类型是 BaseAgentEventBaseChatMessage 的子类。

  • emit_team_events (bool, optional) – 是否通过 BaseGroupChat.run_stream() 发出团队事件。默认为 False。

  • model_client_streaming (bool, optional) – 是否对模型客户端使用流式传输。(这对于像 QwQ 这样的推理模型很有用)。默认为 False。

  • model_context (ChatCompletionContext | None, optional) – 用于存储和检索 LLMMessage 的模型上下文。可以预加载初始消息。存储在模型上下文中的消息将用于发言者选择。当团队重置时,初始消息将被清除。

抛出:

ValueError – 如果参与者数量少于两个或选择器提示无效。

示例

一个拥有多个参与者的团队

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    async def lookup_hotel(location: str) -> str:
        return f"Here are some hotels in {location}: hotel1, hotel2, hotel3."

    async def lookup_flight(origin: str, destination: str) -> str:
        return f"Here are some flights from {origin} to {destination}: flight1, flight2, flight3."

    async def book_trip() -> str:
        return "Your trip is booked!"

    travel_advisor = AssistantAgent(
        "Travel_Advisor",
        model_client,
        tools=[book_trip],
        description="Helps with travel planning.",
    )
    hotel_agent = AssistantAgent(
        "Hotel_Agent",
        model_client,
        tools=[lookup_hotel],
        description="Helps with hotel booking.",
    )
    flight_agent = AssistantAgent(
        "Flight_Agent",
        model_client,
        tools=[lookup_flight],
        description="Helps with flight booking.",
    )
    termination = TextMentionTermination("TERMINATE")
    team = SelectorGroupChat(
        [travel_advisor, hotel_agent, flight_agent],
        model_client=model_client,
        termination_condition=termination,
    )
    await Console(team.run_stream(task="Book a 3-day trip to new york."))


asyncio.run(main())

一个带有自定义选择器函数的团队

import asyncio
from typing import Sequence
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    def check_calculation(x: int, y: int, answer: int) -> str:
        if x + y == answer:
            return "Correct!"
        else:
            return "Incorrect!"

    agent1 = AssistantAgent(
        "Agent1",
        model_client,
        description="For calculation",
        system_message="Calculate the sum of two numbers",
    )
    agent2 = AssistantAgent(
        "Agent2",
        model_client,
        tools=[check_calculation],
        description="For checking calculation",
        system_message="Check the answer and respond with 'Correct!' or 'Incorrect!'",
    )

    def selector_func(messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> str | None:
        if len(messages) == 1 or messages[-1].to_text() == "Incorrect!":
            return "Agent1"
        if messages[-1].source == "Agent1":
            return "Agent2"
        return None

    termination = TextMentionTermination("Correct!")
    team = SelectorGroupChat(
        [agent1, agent2],
        model_client=model_client,
        selector_func=selector_func,
        termination_condition=termination,
    )

    await Console(team.run_stream(task="What is 1 + 1?"))


asyncio.run(main())

一个带有自定义模型上下文的团队

import asyncio

from autogen_core.model_context import BufferedChatCompletionContext
from autogen_ext.models.openai import OpenAIChatCompletionClient

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.ui import Console


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    model_context = BufferedChatCompletionContext(buffer_size=5)

    async def lookup_hotel(location: str) -> str:
        return f"Here are some hotels in {location}: hotel1, hotel2, hotel3."

    async def lookup_flight(origin: str, destination: str) -> str:
        return f"Here are some flights from {origin} to {destination}: flight1, flight2, flight3."

    async def book_trip() -> str:
        return "Your trip is booked!"

    travel_advisor = AssistantAgent(
        "Travel_Advisor",
        model_client,
        tools=[book_trip],
        description="Helps with travel planning.",
    )
    hotel_agent = AssistantAgent(
        "Hotel_Agent",
        model_client,
        tools=[lookup_hotel],
        description="Helps with hotel booking.",
    )
    flight_agent = AssistantAgent(
        "Flight_Agent",
        model_client,
        tools=[lookup_flight],
        description="Helps with flight booking.",
    )
    termination = TextMentionTermination("TERMINATE")
    team = SelectorGroupChat(
        [travel_advisor, hotel_agent, flight_agent],
        model_client=model_client,
        termination_condition=termination,
        model_context=model_context,
    )
    await Console(team.run_stream(task="Book a 3-day trip to new york."))


asyncio.run(main())
component_config_schema#

别名 SelectorGroupChatConfig

component_provider_override: ClassVar[str | None] = 'autogen_agentchat.teams.SelectorGroupChat'#

覆盖组件的提供者字符串。这应该用于防止内部模块名称成为模块名称的一部分。

DEFAULT_NAME = 'SelectorGroupChat'#
DEFAULT_DESCRIPTION = 'A team of agents.'#
_to_config() SelectorGroupChatConfig[source]#

转储创建与此实例配置匹配的组件新实例所需的配置。

返回:

T – 组件的配置。

classmethod _from_config(config: SelectorGroupChatConfig) Self[source]#

从配置对象创建组件的新实例。

参数:

config (T) – 配置对象。

返回:

Self – 组件的新实例。

class Swarm(participants: List[ChatAgent], *, name: str | None = None, description: str | None = None, termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False)[source]#

基类:BaseGroupChat, Component[SwarmConfig]

一个只根据移交消息选择下一个发言人的群聊团队。

参与者列表中的第一个参与者是初始发言人。下一个发言人根据当前发言人发送的 HandoffMessage 消息选择。如果没有发送移交消息,则当前发言人继续作为发言人。

注意

RoundRobinGroupChatSelectorGroupChat 不同,此群聊团队不支持将内部团队作为参与者。

参数:
  • participants (List[ChatAgent]) – 参与群聊的代理。列表中的第一个代理是初始发言人。

  • name (str | None, optional) – 群聊的名称,如果未提供,则使用 DEFAULT_NAME。父团队使用该名称来识别此群聊,因此它在父团队中必须是唯一的。

  • description (str | None, optional) – 群聊的描述,如果未提供,则使用 DEFAULT_DESCRIPTION

  • termination_condition (TerminationCondition, optional) – 群聊的终止条件。默认为 None。如果没有终止条件,群聊将无限期运行。

  • max_turns (int, optional) – 群聊在停止前的最大轮次。默认为 None,表示无限制。

  • custom_message_types (List[type[BaseAgentEvent | BaseChatMessage]], optional) – 将在群聊中使用的自定义消息类型列表。如果您正在使用自定义消息类型或您的代理生成自定义消息类型,则需要在此处指定它们。确保您的自定义消息类型是 BaseAgentEventBaseChatMessage 的子类。

  • emit_team_events (bool, optional) – 是否通过 BaseGroupChat.run_stream() 发出团队事件。默认为 False。

基本示例

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import Swarm
from autogen_agentchat.conditions import MaxMessageTermination


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent(
        "Alice",
        model_client=model_client,
        handoffs=["Bob"],
        system_message="You are Alice and you only answer questions about yourself.",
    )
    agent2 = AssistantAgent(
        "Bob", model_client=model_client, system_message="You are Bob and your birthday is on 1st January."
    )

    termination = MaxMessageTermination(3)
    team = Swarm([agent1, agent2], termination_condition=termination)

    stream = team.run_stream(task="What is bob's birthday?")
    async for message in stream:
        print(message)


asyncio.run(main())

使用 HandoffTermination 进行人工干预移交

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import Swarm
from autogen_agentchat.conditions import HandoffTermination, MaxMessageTermination
from autogen_agentchat.ui import Console
from autogen_agentchat.messages import HandoffMessage


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent = AssistantAgent(
        "Alice",
        model_client=model_client,
        handoffs=["user"],
        system_message="You are Alice and you only answer questions about yourself, ask the user for help if needed.",
    )
    termination = HandoffTermination(target="user") | MaxMessageTermination(3)
    team = Swarm([agent], termination_condition=termination)

    # Start the conversation.
    await Console(team.run_stream(task="What is bob's birthday?"))

    # Resume with user feedback.
    await Console(
        team.run_stream(
            task=HandoffMessage(source="user", target="Alice", content="Bob's birthday is on 1st January.")
        )
    )


asyncio.run(main())
component_config_schema#

别名 SwarmConfig

component_provider_override: ClassVar[str | None] = 'autogen_agentchat.teams.Swarm'#

覆盖组件的提供者字符串。这应该用于防止内部模块名称成为模块名称的一部分。

DEFAULT_NAME = 'Swarm'#
DEFAULT_DESCRIPTION = '一群代理。'#
_to_config() SwarmConfig[source]#

转储创建与此实例配置匹配的组件新实例所需的配置。

返回:

T – 组件的配置。

classmethod _from_config(config: SwarmConfig) Swarm[source]#

从配置对象创建组件的新实例。

参数:

config (T) – 配置对象。

返回:

Self – 组件的新实例。

class MagenticOneGroupChat(participants: List[ChatAgent], model_client: ChatCompletionClient, *, name: str | None = None, description: str | None = None, termination_condition: TerminationCondition | None = None, max_turns: int | None = 20, runtime: AgentRuntime | None = None, max_stalls: int = 3, final_answer_prompt: str = ORCHESTRATOR_FINAL_ANSWER_PROMPT, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False)[source]#

基类:BaseGroupChat, Component[MagenticOneGroupChatConfig]

一个由 MagenticOneOrchestrator 管理的群聊团队。

协调器处理对话流程,通过管理参与者的互动来确保高效完成任务。

协调器基于 Magentic-One 架构,这是一种用于解决复杂任务的通用多代理系统(参见下面的参考资料)。

RoundRobinGroupChatSelectorGroupChat 不同,MagenticOneGroupChat 不支持将团队作为参与者。

参数:
  • participants (List[ChatAgent]) – 群聊的参与者。

  • model_client (ChatCompletionClient) – 用于生成响应的模型客户端。

  • termination_condition (TerminationCondition, optional) – 群聊的终止条件。默认为 None。如果没有终止条件,群聊将根据协调器逻辑运行,或者直到达到最大轮数。

  • max_turns (int, optional) – 停止群聊前的最大轮数。默认为 20。

  • max_stalls (int, optional) – 重新规划前允许的最大停滞次数。默认为 3。

  • final_answer_prompt (str, optional) – 用于从团队的聊天记录中生成最终答案或响应的 LLM 提示。提供了默认值(对于 GPT-4o 类模型适用)。

  • custom_message_types (List[type[BaseAgentEvent | BaseChatMessage]], optional) – 将在群聊中使用的自定义消息类型列表。如果您正在使用自定义消息类型或您的代理生成自定义消息类型,则需要在此处指定它们。确保您的自定义消息类型是 BaseAgentEventBaseChatMessage 的子类。

  • emit_team_events (bool, optional) – 是否通过 BaseGroupChat.run_stream() 发出团队事件。默认为 False。

抛出:

ValueError – 如果进度分类账缺少所需键,或者下一个发言人无效,则在编排逻辑中会引发此错误。

示例

带有一个助手代理的 MagenticOneGroupChat

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import MagenticOneGroupChat
from autogen_agentchat.ui import Console


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    assistant = AssistantAgent(
        "Assistant",
        model_client=model_client,
    )
    team = MagenticOneGroupChat([assistant], model_client=model_client)
    await Console(team.run_stream(task="Provide a different proof to Fermat last theorem"))


asyncio.run(main())

参考资料

如果您在工作中使用 MagenticOneGroupChat,请引用以下论文

@article{fourney2024magentic,
    title={Magentic-one: A generalist multi-agent system for solving complex tasks},
    author={Fourney, Adam and Bansal, Gagan and Mozannar, Hussein and Tan, Cheng and Salinas, Eduardo and Niedtner, Friederike and Proebsting, Grace and Bassman, Griffin and Gerrits, Jack and Alber, Jacob and others},
    journal={arXiv preprint arXiv:2411.04468},
    year={2024}
}
component_config_schema#

别名 MagenticOneGroupChatConfig

component_provider_override: ClassVar[str | None] = 'autogen_agentchat.teams.MagenticOneGroupChat'#

覆盖组件的提供者字符串。这应该用于防止内部模块名称成为模块名称的一部分。

DEFAULT_NAME = 'MagenticOneGroupChat'#
DEFAULT_DESCRIPTION = '一群代理。'#
_to_config() MagenticOneGroupChatConfig[source]#

转储创建与此实例配置匹配的组件新实例所需的配置。

返回:

T – 组件的配置。

classmethod _from_config(config: MagenticOneGroupChatConfig) Self[source]#

从配置对象创建组件的新实例。

参数:

config (T) – 配置对象。

返回:

Self – 组件的新实例。

class DiGraphBuilder[source]#

基类: object

一个用于构建 DiGraph 执行图的流畅构建器,用于 GraphFlow

警告

这是一个实验性功能,API 在未来的版本中可能会更改。

此实用程序提供了一种方便的方式来以编程方式构建代理交互图,包括复杂的执行流,例如:

  • 顺序链

  • 并行扇出

  • 条件分支

  • 带安全出口的循环

图中的每个节点代表一个代理。边定义了代理之间的执行路径,并且可以选择使用可调用函数根据消息内容进行条件判断。

该构建器与 Graph 运行器兼容,并支持标准代理和过滤代理。

- add_node(agent, activation)

向图中添加一个代理节点。

- add_edge(source, target, condition)

连接两个节点,可选地带一个条件。

- add_conditional_edges(source, condition_to_target)

从源添加多个条件边。

- set_entry_point(agent)

定义默认起始节点(可选)。

- build()

生成一个经过验证的 DiGraph

- get_participants()

返回添加的代理列表。

示例 — 顺序流 A → B → C
>>> builder = GraphBuilder()
>>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
>>> builder.add_edge(agent_a, agent_b).add_edge(agent_b, agent_c)
>>> team = Graph(
...     participants=builder.get_participants(),
...     graph=builder.build(),
...     termination_condition=MaxMessageTermination(5),
... )
示例 — 并行扇出 A → (B, C)
>>> builder = GraphBuilder()
>>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
>>> builder.add_edge(agent_a, agent_b).add_edge(agent_a, agent_c)
示例 — 条件分支 A → B 或 A → C
>>> builder = GraphBuilder()
>>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
>>> # Add conditional edges using keyword check
>>> builder.add_edge(agent_a, agent_b, condition="keyword1")
>>> builder.add_edge(agent_a, agent_c, condition="keyword2")
示例 — 使用自定义字符串条件
>>> builder = GraphBuilder()
>>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
>>> # Add condition strings to check in messages
>>> builder.add_edge(agent_a, agent_b, condition="big")
>>> builder.add_edge(agent_a, agent_c, condition="small")
示例 — 循环:A → B → A 或 B → C
>>> builder = GraphBuilder()
>>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
>>> builder.add_edge(agent_a, agent_b)
>> # Add a loop back to agent A
>>> builder.add_edge(agent_b, agent_a, condition=lambda msg: "loop" in msg.to_model_text())
>>> # Add exit condition to break the loop
>>> builder.add_edge(agent_b, agent_c, condition=lambda msg: "loop" not in msg.to_model_text())
示例 — 具有多条路径通往同一节点的循环:A → B → C → B
>>> builder = GraphBuilder()
>>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
>>> builder.add_edge(agent_a, agent_b)
>>> builder.add_edge(agent_b, agent_c)
>>> builder.add_edge(agent_c, agent_b, activation_group="loop_back")
示例 — 具有多条路径通往同一节点并带有任意激活条件的循环:A → B → (C1, C2) → B → E(exit)
>>> builder = GraphBuilder()
>>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c1).add_node(agent_c2).add_node(agent_e)
>>> builder.add_edge(agent_a, agent_b)
>>> builder.add_edge(agent_b, agent_c1)
>>> builder.add_edge(agent_b, agent_c2)
>>> builder.add_edge(agent_b, agent_e, condition="exit")
>>> builder.add_edge(agent_c1, agent_b, activation_group="loop_back_group", activation_condition="any")
>>> builder.add_edge(agent_c2, agent_b, activation_group="loop_back_group", activation_condition="any")
add_node(agent: ChatAgent, activation: Literal['all', 'any'] = all) DiGraphBuilder[source]#

向图中添加一个节点并注册其代理。

add_edge(source: str | ChatAgent, target: str | ChatAgent, condition: str | Callable[[BaseChatMessage], bool] | None = None, activation_group: str | None = None, activation_condition: Literal['all', 'any'] | None = None) DiGraphBuilder[source]#

从源到目标添加有向边,可选地带一个条件。

参数:
  • source – 源节点(代理名称或代理对象)

  • target – 目标节点(代理名称或代理对象)

  • condition – 边激活的可选条件。如果是字符串,当消息中找到子字符串时激活。如果可调用,当函数对消息返回 True 时激活。

返回:

用于方法链的 Self

抛出:

ValueError – 如果源节点或目标节点在构建器中不存在

add_conditional_edges(source: str | ChatAgent, condition_to_target: Dict[str, str | ChatAgent]) DiGraphBuilder[source]#

根据关键词检查,从源节点添加多个条件边。

警告

此方法接口在未来将更改以支持可调用条件。如果您需要指定自定义条件,请使用 add_edge

参数:
  • source – 源节点(代理名称或代理对象)

  • condition_to_target

    条件字符串到目标节点的映射。每个键都是将在消息内容中检查的关键词。每个值都是当满足条件时要激活的目标节点。

    对于每个键(关键词),将创建一个 lambda,检查关键词是否存在于消息文本中。

返回:

用于方法链的 Self

set_entry_point(name: str | ChatAgent) DiGraphBuilder[source]#

设置图的默认起始节点。

build() DiGraph[source]#

构建并验证 DiGraph。

get_participants() list[ChatAgent][source]#

按插入顺序返回构建器中的代理列表。

pydantic 模型 DiGraph[source]#

基类: BaseModel

定义了一个带有节点和边的有向图结构。GraphFlow 使用它来确定执行顺序和条件。

警告

这是一个实验性功能,API 在未来的版本中可能会更改。

显示 JSON 模式
{
   "title": "DiGraph",
   "type": "object",
   "properties": {
      "nodes": {
         "default": null,
         "title": "Nodes"
      },
      "default_start_node": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Default Start Node"
      }
   }
}

字段:
  • default_start_node (str | None)

  • nodes (Dict[str, autogen_agentchat.teams._group_chat._graph._digraph_group_chat.DiGraphNode])

field nodes: Dict[str, DiGraphNode] [必填]#
field default_start_node: str | None = None#
get_parents() Dict[str, List[str]][source]#

计算每个节点到其父节点的映射。

get_start_nodes() Set[str][source]#

返回没有传入边的节点(入口点)。

get_leaf_nodes() Set[str][source]#

返回没有传出边的节点(最终输出节点)。

has_cycles_with_exit() bool[source]#

检查图是否包含任何循环,并验证每个循环至少有一条条件边。

返回:

bool – 如果至少有一个循环且所有循环都具有退出条件,则为 True。如果没有循环,则为 False。

抛出:

ValueError – 如果存在没有任何条件边的循环。

get_has_cycles() bool[source]#

指示图是否至少有一个循环(具有有效的退出条件)。

graph_validate() None[source]#

验证图结构和执行规则。

get_remaining_map() Dict[str, Dict[str, int]][source]#

获取剩余映射,该映射跟踪每个激活组中有多少边指向每个目标节点。

返回:

将目标节点映射到其激活组和剩余计数的字典。

model_post_init(context: Any, /) None#

此函数旨在像 BaseModel 方法一样初始化私有属性。

它将上下文作为参数,因为这是 pydantic-core 在调用它时传递的。

参数:
  • self – BaseModel 实例。

  • context – 上下文。

pydantic 模型 DiGraphNode[source]#

基类: BaseModel

表示 DiGraph 中的一个节点(代理),包含其出边和激活类型。

警告

这是一个实验性功能,API 在未来的版本中可能会更改。

显示 JSON 模式
{
   "title": "DiGraphNode",
   "type": "object",
   "properties": {
      "name": {
         "title": "Name",
         "type": "string"
      },
      "edges": {
         "default": null,
         "title": "Edges"
      },
      "activation": {
         "default": "all",
         "enum": [
            "all",
            "any"
         ],
         "title": "Activation",
         "type": "string"
      }
   },
   "required": [
      "name"
   ]
}

字段:
  • activation (Literal['all', 'any'])

  • edges (List[autogen_agentchat.teams._group_chat._graph._digraph_group_chat.DiGraphEdge])

  • name (str)

field name: str [必填]#
field edges: List[DiGraphEdge] = []#
field activation: Literal['all', 'any'] = all#
pydantic 模型 DiGraphEdge[source]#

基类: BaseModel

表示 DiGraph 中的一条有向边,带有可选的执行条件。

警告

这是一个实验性功能,API 在未来的版本中可能会更改。

警告

如果条件是可调用对象,它将不会在模型中序列化。

显示 JSON 模式
{
   "title": "DiGraphEdge",
   "type": "object",
   "properties": {
      "target": {
         "title": "Target",
         "type": "string"
      },
      "condition": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Condition"
      },
      "condition_function": {
         "default": null,
         "title": "Condition Function"
      },
      "activation_group": {
         "default": "",
         "title": "Activation Group",
         "type": "string"
      },
      "activation_condition": {
         "default": "all",
         "enum": [
            "all",
            "any"
         ],
         "title": "Activation Condition",
         "type": "string"
      }
   },
   "required": [
      "target"
   ]
}

字段:
  • activation_condition (Literal['all', 'any'])

  • activation_group (str)

  • condition (str | Callable[[autogen_agentchat.messages.BaseChatMessage], bool] | None)

  • condition_function (Callable[[autogen_agentchat.messages.BaseChatMessage], bool] | None)

  • target (str)

验证器:
  • _validate_condition » 所有 字段

field target: str [必填]#
由以下验证:
  • _validate_condition

field condition: str | Callable[[BaseChatMessage], bool] | None = None#

(实验性)执行此边的条件。如果为 None,则边是无条件的。如果是字符串,则边以最后一条代理聊天消息中是否存在该字符串为条件。如果是可调用对象,则边以可调用对象在给定最后一条消息时返回 True 为条件。

由以下验证:
  • _validate_condition

field condition_function: Callable[[BaseChatMessage], bool] | None = None#
由以下验证:
  • _validate_condition

field activation_group: str = ''#

前向依赖的组标识符。

当多条边指向同一目标节点时,它们按此字段分组。这允许区分不同的循环或依赖模式。

示例:在包含 A->B->C->B 这样的循环的图中,指向 B 的两条边(A->B 和 C->B)可以位于不同的激活组中,以控制 B 的激活方式。如果未指定,则默认为目标节点名称。

由以下验证:
  • _validate_condition

field activation_condition: Literal['all', 'any'] = all#

确定同一激活组中的前向依赖项的评估方式。

  • “all”:此激活组中的所有边都必须满足,目标节点才能执行

  • “any”:此激活组中的任何一条边满足,即可允许目标节点执行

这用于处理循环图中复杂的依赖模式,其中多条路径可以 dẫn 到同一目标节点。

由以下验证:
  • _validate_condition

check_condition(message: BaseChatMessage) bool[source]#

检查给定消息的边条件是否满足。

参数:

message – 用于检查条件的消息。

返回:
  • 如果条件满足,则为 True (None 条件总是返回 True)

  • 否则为 False。

class GraphFlow(participants: List[ChatAgent], graph: DiGraph, *, name: str | None = None, description: str | None = None, termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None)[source]#

基类:BaseGroupChat, Component[GraphFlowConfig]

一个遵循有向图执行模式的群聊团队。

警告

这是一个实验性功能,API 在未来的版本中可能会更改。

此群聊根据有向图(DiGraph)结构执行代理,允许复杂的工作流程,例如顺序执行、并行扇出、条件分支、连接模式和带有明确退出条件的循环。

执行顺序由 DiGraph 中定义的边决定。图中的每个节点对应一个代理,边定义了代理之间消息的流动。节点可以配置为在以下情况激活:

  • 所有父节点已完成(激活=”all”)→ 默认

  • 任何父节点完成(激活=”any”)

条件分支通过边条件支持,其中下一个代理(或多个代理)根据聊天历史记录中的内容进行选择。只要存在最终退出循环的条件,就允许循环。

注意

使用 DiGraphBuilder 类可以轻松创建 DiGraph。它提供了一个流畅的 API,用于添加节点和边、设置入口点以及验证图结构。有关更多详细信息,请参阅 DiGraphBuilder 文档。GraphFlow 类旨在与 DiGraphBuilder 结合使用,以创建复杂的工作流。

警告

当在边中使用可调用条件时,它们在调用 dump_component() 时不会被序列化。这将在未来的版本中解决。

参数:
  • participants (List[ChatAgent]) – 群聊的参与者。

  • termination_condition (TerminationCondition, optional) – 聊天的终止条件。

  • max_turns (int, optional) – 强制终止前的最大轮数。

  • graph (DiGraph) – 定义节点流和条件的有向执行图。

抛出:

ValueError – 如果参与者名称不唯一,或者图验证失败(例如,没有出口的循环)。

示例

顺序流:A → B → C

import asyncio

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main():
    # Initialize agents with OpenAI model clients.
    model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano")
    agent_a = AssistantAgent("A", model_client=model_client, system_message="You are a helpful assistant.")
    agent_b = AssistantAgent("B", model_client=model_client, system_message="Translate input to Chinese.")
    agent_c = AssistantAgent("C", model_client=model_client, system_message="Translate input to English.")

    # Create a directed graph with sequential flow A -> B -> C.
    builder = DiGraphBuilder()
    builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
    builder.add_edge(agent_a, agent_b).add_edge(agent_b, agent_c)
    graph = builder.build()

    # Create a GraphFlow team with the directed graph.
    team = GraphFlow(
        participants=[agent_a, agent_b, agent_c],
        graph=graph,
        termination_condition=MaxMessageTermination(5),
    )

    # Run the team and print the events.
    async for event in team.run_stream(task="Write a short story about a cat."):
        print(event)


asyncio.run(main())

并行扇出:A → (B, C)

import asyncio

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main():
    # Initialize agents with OpenAI model clients.
    model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano")
    agent_a = AssistantAgent("A", model_client=model_client, system_message="You are a helpful assistant.")
    agent_b = AssistantAgent("B", model_client=model_client, system_message="Translate input to Chinese.")
    agent_c = AssistantAgent("C", model_client=model_client, system_message="Translate input to Japanese.")

    # Create a directed graph with fan-out flow A -> (B, C).
    builder = DiGraphBuilder()
    builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
    builder.add_edge(agent_a, agent_b).add_edge(agent_a, agent_c)
    graph = builder.build()

    # Create a GraphFlow team with the directed graph.
    team = GraphFlow(
        participants=[agent_a, agent_b, agent_c],
        graph=graph,
        termination_condition=MaxMessageTermination(5),
    )

    # Run the team and print the events.
    async for event in team.run_stream(task="Write a short story about a cat."):
        print(event)


asyncio.run(main())

条件分支:A → B(如果“是”)或 C(否则)

import asyncio

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main():
    # Initialize agents with OpenAI model clients.
    model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano")
    agent_a = AssistantAgent(
        "A",
        model_client=model_client,
        system_message="Detect if the input is in Chinese. If it is, say 'yes', else say 'no', and nothing else.",
    )
    agent_b = AssistantAgent("B", model_client=model_client, system_message="Translate input to English.")
    agent_c = AssistantAgent("C", model_client=model_client, system_message="Translate input to Chinese.")

    # Create a directed graph with conditional branching flow A -> B ("yes"), A -> C (otherwise).
    builder = DiGraphBuilder()
    builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
    # Create conditions as callables that check the message content.
    builder.add_edge(agent_a, agent_b, condition=lambda msg: "yes" in msg.to_model_text())
    builder.add_edge(agent_a, agent_c, condition=lambda msg: "yes" not in msg.to_model_text())
    graph = builder.build()

    # Create a GraphFlow team with the directed graph.
    team = GraphFlow(
        participants=[agent_a, agent_b, agent_c],
        graph=graph,
        termination_condition=MaxMessageTermination(5),
    )

    # Run the team and print the events.
    async for event in team.run_stream(task="AutoGen is a framework for building AI agents."):
        print(event)


asyncio.run(main())

带退出条件的循环:A → B → C(如果“APPROVE”)或 A(否则)

import asyncio

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main():
    # Initialize agents with OpenAI model clients.
    model_client = OpenAIChatCompletionClient(model="gpt-4.1")
    agent_a = AssistantAgent(
        "A",
        model_client=model_client,
        system_message="You are a helpful assistant.",
    )
    agent_b = AssistantAgent(
        "B",
        model_client=model_client,
        system_message="Provide feedback on the input, if your feedback has been addressed, "
        "say 'APPROVE', otherwise provide a reason for rejection.",
    )
    agent_c = AssistantAgent(
        "C", model_client=model_client, system_message="Translate the final product to Korean."
    )

    # Create a loop graph with conditional exit: A -> B -> C ("APPROVE"), B -> A (otherwise).
    builder = DiGraphBuilder()
    builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
    builder.add_edge(agent_a, agent_b)

    # Create conditional edges using strings
    builder.add_edge(agent_b, agent_c, condition=lambda msg: "APPROVE" in msg.to_model_text())
    builder.add_edge(agent_b, agent_a, condition=lambda msg: "APPROVE" not in msg.to_model_text())

    builder.set_entry_point(agent_a)
    graph = builder.build()

    # Create a GraphFlow team with the directed graph.
    team = GraphFlow(
        participants=[agent_a, agent_b, agent_c],
        graph=graph,
        termination_condition=MaxMessageTermination(20),  # Max 20 messages to avoid infinite loop.
    )

    # Run the team and print the events.
    async for event in team.run_stream(task="Write a short poem about AI Agents."):
        print(event)


asyncio.run(main())
component_config_schema#

别名 GraphFlowConfig

component_provider_override: ClassVar[str | None] = 'autogen_agentchat.teams.GraphFlow'#

覆盖组件的提供者字符串。这应该用于防止内部模块名称成为模块名称的一部分。

DEFAULT_NAME = 'GraphFlow'#
DEFAULT_DESCRIPTION = '一群代理'#