autogen_agentchat.teams#
此模块提供了各种预定义的多代理团队的实现。每个团队都继承自 BaseGroupChat 类。
- class BaseGroupChat(participants: List[ChatAgent], group_chat_manager_name: str, group_chat_manager_class: type[SequentialRoutedAgent], termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False)[source]#
基类:
Team
,ABC
,ComponentBase
[BaseModel
]群聊团队的基类。
要实现群聊团队,首先创建
BaseGroupChatManager
的子类,然后创建BaseGroupChat
的子类,该子类使用群聊管理器。- component_type: ClassVar[ComponentType] = 'team'#
组件的逻辑类型。
- async load_state(state: Mapping[str, Any]) None [source]#
加载外部状态并覆盖群聊团队的当前状态。
通过使用其内部代理 ID 调用每个参与者和群聊管理器的
agent_load_state()
方法来加载状态。有关状态的预期格式,请参见save_state()
。
- async pause() None [source]#
当团队通过直接 RPC 调用运行其
on_pause()
方法时,暂停其参与者。注意
这是 v0.4.9 中引入的一项实验性功能,将来可能会更改或删除。
必须先初始化团队,然后才能暂停。
与终止不同,暂停团队不会导致
run()
或run_stream()
方法返回。它在每个参与者上调用on_pause()
方法,如果参与者未实现该方法,则它将不执行任何操作。注意
代理类有责任处理暂停并确保代理可以稍后恢复。确保在您的代理类中实现
on_pause()
方法以实现自定义暂停行为。默认情况下,调用时代理不会执行任何操作。- 引发:
RuntimeError – 如果团队尚未初始化。从参与者调用
on_pause
的实现时出现的异常会传播到此方法并引发。
- async reset() None [source]#
将团队及其参与者重置为其初始状态。
团队必须先停止才能重置。
- 引发:
RuntimeError – 如果团队尚未初始化或当前正在运行。
使用
RoundRobinGroupChat
团队的示例import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) # Reset the team. await team.reset() stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) asyncio.run(main())
- async resume() None [source]#
当团队正在运行并且通过直接 RPC 调用暂停时,恢复其参与者,调用他们的
on_resume()
方法。注意
这是 v0.4.9 中引入的一项实验性功能,将来可能会更改或删除。
团队必须先初始化才能恢复。
与终止和重新启动新任务不同,恢复团队不会导致
run()
或run_stream()
方法返回。 它调用每个参与者的on_resume()
方法,如果参与者未实现该方法,则它将不执行任何操作。注意
代理类有责任处理恢复并确保代理从暂停的位置继续。 确保在您的代理类中实现
on_resume()
方法以获得自定义恢复行为。- 引发:
RuntimeError – 如果团队尚未初始化。 调用
on_resume
方法的参与者实现中的异常将传播到此方法并引发。
- async run(*, task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None, cancellation_token: CancellationToken | None = None) TaskResult [source]#
运行团队并返回结果。 基本实现使用
run_stream()
运行团队,然后返回最终结果。 一旦团队停止,终止条件将被重置。- 参数:
task (str | BaseChatMessage | Sequence[BaseChatMessage] | None) – 运行团队的任务。 可以是字符串,单个
BaseChatMessage
或BaseChatMessage
的列表。cancellation_token (CancellationToken | None) – 立即终止任务的取消令牌。 设置取消令牌可能会使团队处于不一致的状态,并且可能不会重置终止条件。 要优雅地停止团队,请改用
ExternalTermination
。
- 返回值:
result – 任务的结果,类型为
TaskResult
。 结果包含团队生成的消息和停止原因。
使用
RoundRobinGroupChat
团队的示例import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) result = await team.run(task="Count from 1 to 10, respond one at a time.") print(result) # Run the team again without a task to continue the previous task. result = await team.run() print(result) asyncio.run(main())
使用
CancellationToken
取消任务的示例import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_core import CancellationToken from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) cancellation_token = CancellationToken() # Create a task to run the team in the background. run_task = asyncio.create_task( team.run( task="Count from 1 to 10, respond one at a time.", cancellation_token=cancellation_token, ) ) # Wait for 1 second and then cancel the task. await asyncio.sleep(1) cancellation_token.cancel() # This will raise a cancellation error. await run_task asyncio.run(main())
- async run_stream(*, task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None, cancellation_token: CancellationToken | None = None) AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None] [source]#
运行团队并生成消息流以及类型为
TaskResult
的最终结果,作为流中的最后一项。 一旦团队停止,终止条件将被重置。注意
如果代理生成
ModelClientStreamingChunkEvent
,则该消息将在流中产生,但不会包含在messages
中。- 参数:
task (str | BaseChatMessage | Sequence[BaseChatMessage] | None) – 运行团队的任务。 可以是字符串,单个
BaseChatMessage
或BaseChatMessage
的列表。cancellation_token (CancellationToken | None) – 立即终止任务的取消令牌。 设置取消令牌可能会使团队处于不一致的状态,并且可能不会重置终止条件。 要优雅地停止团队,请改用
ExternalTermination
。
- 返回值:
stream – 一个
AsyncGenerator
,它产生BaseAgentEvent
、BaseChatMessage
和最终结果TaskResult
作为流中的最后一项。
使用
RoundRobinGroupChat
团队的示例import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) # Run the team again without a task to continue the previous task. stream = team.run_stream() async for message in stream: print(message) asyncio.run(main())
使用
CancellationToken
取消任务的示例import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.ui import Console from autogen_agentchat.teams import RoundRobinGroupChat from autogen_core import CancellationToken from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) cancellation_token = CancellationToken() # Create a task to run the team in the background. run_task = asyncio.create_task( Console( team.run_stream( task="Count from 1 to 10, respond one at a time.", cancellation_token=cancellation_token, ) ) ) # Wait for 1 second and then cancel the task. await asyncio.sleep(1) cancellation_token.cancel() # This will raise a cancellation error. await run_task asyncio.run(main())
- async save_state() Mapping[str, Any] [source]#
保存群聊团队的状态。
通过在每个参与者和群聊管理器上调用
agent_save_state()
方法,并使用其内部代理 ID 来保存状态。状态以嵌套字典的形式返回:一个键为 agent_states 的字典,它是一个以代理名称为键,状态为值的字典。{ "agent_states": { "agent1": ..., "agent2": ..., "RoundRobinGroupChatManager": ... } }
注意
从 v0.4.9 开始,状态使用代理名称作为键,而不是代理 ID,并且状态中移除了 team_id 字段。这是为了使状态能够在不同的团队和运行时之间移植。以旧格式保存的状态在未来可能与新格式不兼容。
注意
在团队运行时调用
save_state()
时,状态可能不一致,并可能导致意外状态。建议在团队未运行或停止后调用此方法。
- pydantic 模型 DiGraph[source]#
基类:
BaseModel
定义具有节点和边的有向图结构。
GraphFlow
使用它来确定执行顺序和条件。警告
这是一个实验性功能,API 将在未来的版本中更改。
显示 JSON 模式
{ "title": "DiGraph", "description": "Defines a directed graph structure with nodes and edges.\n:class:`GraphFlow` uses this to determine execution order and conditions.\n\n.. warning::\n\n This is an experimental feature, and the API will change in the future releases.", "type": "object", "properties": { "nodes": { "additionalProperties": { "$ref": "#/$defs/DiGraphNode" }, "title": "Nodes", "type": "object" }, "default_start_node": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Default Start Node" } }, "$defs": { "DiGraphEdge": { "description": "Represents a directed edge in a :class:`DiGraph`, with an optional execution condition.\n\n.. warning::\n\n This is an experimental feature, and the API will change in the future releases.", "properties": { "target": { "title": "Target", "type": "string" }, "condition": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Condition" } }, "required": [ "target" ], "title": "DiGraphEdge", "type": "object" }, "DiGraphNode": { "description": "Represents a node (agent) in a :class:`DiGraph`, with its outgoing edges and activation type.\n\n.. warning::\n\n This is an experimental feature, and the API will change in the future releases.", "properties": { "name": { "title": "Name", "type": "string" }, "edges": { "default": [], "items": { "$ref": "#/$defs/DiGraphEdge" }, "title": "Edges", "type": "array" }, "activation": { "default": "all", "enum": [ "all", "any" ], "title": "Activation", "type": "string" } }, "required": [ "name" ], "title": "DiGraphNode", "type": "object" } }, "required": [ "nodes" ] }
- 字段:
default_start_node (str | None)
nodes (Dict[str, autogen_agentchat.teams._group_chat._graph._digraph_group_chat.DiGraphNode])
- field nodes: Dict[str, DiGraphNode] [Required]#
- has_cycles_with_exit() bool [source]#
检查该图是否具有任何循环,并验证每个循环是否至少有一条条件边。
- 返回值:
bool – 如果至少有一个循环,并且所有循环都有退出条件,则为 True。 如果没有循环,则为 False。
- 引发:
ValueError – 如果存在没有任何条件边的循环。
- class DiGraphBuilder[source]#
基类:
object
用于构建
DiGraph
执行图的 fluent builder,用于GraphFlow
中。警告
这是一个实验性功能,API 将在未来的版本中更改。
此实用程序提供了一种方便的方法来以编程方式构建代理交互的图,包括复杂的执行流程,例如
顺序链
并行扇出
条件分支
具有安全退出的循环
图中的每个节点代表一个代理。 边定义了代理之间的执行路径,并且可以选择以消息内容为条件。
该构建器与 Graph 运行器兼容,并支持标准代理和过滤代理。
- - add_node(agent, activation)
将一个代理节点添加到图中。
- - add_edge(source, target, condition)
连接两个节点,可选择添加条件。
- - add_conditional_edges(source, condition_to_target)
从源节点添加多个条件边。
- - set_entry_point(agent)
定义默认的起始节点(可选)。
- - build()
生成一个已验证的 DiGraph。
- - get_participants()
返回已添加代理的列表。
- 示例 — 顺序流程 A → B → C
>>> builder = GraphBuilder() >>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c) >>> builder.add_edge(agent_a, agent_b).add_edge(agent_b, agent_c) >>> team = Graph( ... participants=builder.get_participants(), ... graph=builder.build(), ... termination_condition=MaxMessageTermination(5), ... )
- 示例 — 并行扇出 A → (B, C)
>>> builder = GraphBuilder() >>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c) >>> builder.add_edge(agent_a, agent_b).add_edge(agent_a, agent_c)
- 示例 — 条件分支 A → B (“yes”),A → C (“no”)
>>> builder = GraphBuilder() >>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c) >>> builder.add_conditional_edges(agent_a, {"yes": agent_b, "no": agent_c})
- 示例 — 循环:A → B → A (“loop”),B → C (“exit”)
>>> builder = GraphBuilder() >>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c) >>> builder.add_edge(agent_a, agent_b) >>> builder.add_conditional_edges(agent_b, {"loop": agent_a, "exit": agent_c})
- add_conditional_edges(source: str | ChatAgent, condition_to_target: Dict[str, str | ChatAgent]) DiGraphBuilder [source]#
基于条件字符串,从源节点添加多个条件边。
- add_edge(source: str | ChatAgent, target: str | ChatAgent, condition: str | None = None) DiGraphBuilder [source]#
添加从源到目标的有向边,可选择添加条件。
- add_node(agent: ChatAgent, activation: Literal['all', 'any'] = 'all') DiGraphBuilder [source]#
向图中添加一个节点并注册其代理。
- set_entry_point(name: str | ChatAgent) DiGraphBuilder [source]#
设置图的默认起始节点。
- pydantic model DiGraphEdge[source]#
基类:
BaseModel
表示
DiGraph
中的一个有向边,带有可选的执行条件。警告
这是一个实验性功能,API 将在未来的版本中更改。
显示 JSON 模式
{ "title": "DiGraphEdge", "description": "Represents a directed edge in a :class:`DiGraph`, with an optional execution condition.\n\n.. warning::\n\n This is an experimental feature, and the API will change in the future releases.", "type": "object", "properties": { "target": { "title": "Target", "type": "string" }, "condition": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Condition" } }, "required": [ "target" ] }
- 字段:
condition (str | None)
target (str)
- pydantic model DiGraphNode[source]#
基类:
BaseModel
表示
DiGraph
中的一个节点(代理),带有其传出边和激活类型。警告
这是一个实验性功能,API 将在未来的版本中更改。
显示 JSON 模式
{ "title": "DiGraphNode", "description": "Represents a node (agent) in a :class:`DiGraph`, with its outgoing edges and activation type.\n\n.. warning::\n\n This is an experimental feature, and the API will change in the future releases.", "type": "object", "properties": { "name": { "title": "Name", "type": "string" }, "edges": { "default": [], "items": { "$ref": "#/$defs/DiGraphEdge" }, "title": "Edges", "type": "array" }, "activation": { "default": "all", "enum": [ "all", "any" ], "title": "Activation", "type": "string" } }, "$defs": { "DiGraphEdge": { "description": "Represents a directed edge in a :class:`DiGraph`, with an optional execution condition.\n\n.. warning::\n\n This is an experimental feature, and the API will change in the future releases.", "properties": { "target": { "title": "Target", "type": "string" }, "condition": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Condition" } }, "required": [ "target" ], "title": "DiGraphEdge", "type": "object" } }, "required": [ "name" ] }
- 字段:
activation (Literal['all', 'any'])
edges (List[autogen_agentchat.teams._group_chat._graph._digraph_group_chat.DiGraphEdge])
name (str)
- field edges: List[DiGraphEdge] = []#
- class GraphFlow(participants: List[ChatAgent], graph: DiGraph, termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None)[source]#
Bases:
BaseGroupChat
,Component
[GraphFlowConfig
]一个团队,它按照有向图执行模式运行群聊。
警告
这是一个实验性功能,API 将在未来的版本中更改。
此群聊基于有向图(
DiGraph
)结构执行代理,从而允许复杂的任务流,例如顺序执行、并行扇出、条件分支、联接模式以及具有显式退出条件的循环。执行顺序由 DiGraph 中定义的边确定。图中的每个节点对应一个代理,边定义代理之间消息的流动。节点可以配置为在以下情况激活:
当所有父节点都已完成时 (activation=”all”) → 默认
任何父节点完成时 (activation=”any”)
支持使用边缘条件进行条件分支,其中下一个代理的选择基于聊天历史记录中的内容。 允许循环,只要存在最终退出循环的条件即可。
注意
使用
DiGraphBuilder
类轻松创建DiGraph
。它提供了一个流畅的 API,用于添加节点和边、设置入口点以及验证图结构。 有关更多详细信息,请参阅DiGraphBuilder
文档。GraphFlow
类旨在与DiGraphBuilder
一起使用,以创建复杂的工作流程。- 参数:
participants (List[ChatAgent]) – 群聊中的参与者。
termination_condition (TerminationCondition, optional) – 聊天的终止条件(可选)。
max_turns (int, optional) – 强制终止之前的最大轮数(可选)。
graph (DiGraph) – 定义节点流和条件的有向执行图。
- 引发:
ValueError – 如果参与者名称不唯一,或者图验证失败(例如,没有退出的循环)。
示例
顺序流:A → B → C
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import DiGraphBuilder, GraphFlow from autogen_ext.models.openai import OpenAIChatCompletionClient async def main(): # Initialize agents with OpenAI model clients. model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano") agent_a = AssistantAgent("A", model_client=model_client, system_message="You are a helpful assistant.") agent_b = AssistantAgent("B", model_client=model_client, system_message="Translate input to Chinese.") agent_c = AssistantAgent("C", model_client=model_client, system_message="Translate input to English.") # Create a directed graph with sequential flow A -> B -> C. builder = DiGraphBuilder() builder.add_node(agent_a).add_node(agent_b).add_node(agent_c) builder.add_edge(agent_a, agent_b).add_edge(agent_b, agent_c) graph = builder.build() # Create a GraphFlow team with the directed graph. team = GraphFlow( participants=[agent_a, agent_b, agent_c], graph=graph, termination_condition=MaxMessageTermination(5), ) # Run the team and print the events. async for event in team.run_stream(task="Write a short story about a cat."): print(event) asyncio.run(main())
并行扇出:A → (B, C)
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import DiGraphBuilder, GraphFlow from autogen_ext.models.openai import OpenAIChatCompletionClient async def main(): # Initialize agents with OpenAI model clients. model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano") agent_a = AssistantAgent("A", model_client=model_client, system_message="You are a helpful assistant.") agent_b = AssistantAgent("B", model_client=model_client, system_message="Translate input to Chinese.") agent_c = AssistantAgent("C", model_client=model_client, system_message="Translate input to Japanese.") # Create a directed graph with fan-out flow A -> (B, C). builder = DiGraphBuilder() builder.add_node(agent_a).add_node(agent_b).add_node(agent_c) builder.add_edge(agent_a, agent_b).add_edge(agent_a, agent_c) graph = builder.build() # Create a GraphFlow team with the directed graph. team = GraphFlow( participants=[agent_a, agent_b, agent_c], graph=graph, termination_condition=MaxMessageTermination(5), ) # Run the team and print the events. async for event in team.run_stream(task="Write a short story about a cat."): print(event) asyncio.run(main())
条件分支:A → B(如果为“yes”)或 C(如果为“no”)
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import DiGraphBuilder, GraphFlow from autogen_ext.models.openai import OpenAIChatCompletionClient async def main(): # Initialize agents with OpenAI model clients. model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano") agent_a = AssistantAgent( "A", model_client=model_client, system_message="Detect if the input is in Chinese. If it is, say 'yes', else say 'no', and nothing else.", ) agent_b = AssistantAgent("B", model_client=model_client, system_message="Translate input to English.") agent_c = AssistantAgent("C", model_client=model_client, system_message="Translate input to Chinese.") # Create a directed graph with conditional branching flow A -> B ("yes"), A -> C ("no"). builder = DiGraphBuilder() builder.add_node(agent_a).add_node(agent_b).add_node(agent_c) builder.add_edge(agent_a, agent_b, condition="yes") builder.add_edge(agent_a, agent_c, condition="no") graph = builder.build() # Create a GraphFlow team with the directed graph. team = GraphFlow( participants=[agent_a, agent_b, agent_c], graph=graph, termination_condition=MaxMessageTermination(5), ) # Run the team and print the events. async for event in team.run_stream(task="AutoGen is a framework for building AI agents."): print(event) asyncio.run(main())
带有退出条件的循环:A → B → C(如果为“APPROVE”)或 A(如果为“REJECT”)
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import DiGraphBuilder, GraphFlow from autogen_ext.models.openai import OpenAIChatCompletionClient async def main(): # Initialize agents with OpenAI model clients. model_client = OpenAIChatCompletionClient(model="gpt-4.1") agent_a = AssistantAgent( "A", model_client=model_client, system_message="You are a helpful assistant.", ) agent_b = AssistantAgent( "B", model_client=model_client, system_message="Provide feedback on the input, if your feedback has been addressed, " "say 'APPROVE', else say 'REJECT' and provide a reason.", ) agent_c = AssistantAgent( "C", model_client=model_client, system_message="Translate the final product to Korean." ) # Create a loop graph with conditional exit: A -> B -> C ("APPROVE"), B -> A ("REJECT"). builder = DiGraphBuilder() builder.add_node(agent_a).add_node(agent_b).add_node(agent_c) builder.add_edge(agent_a, agent_b) builder.add_conditional_edges(agent_b, {"APPROVE": agent_c, "REJECT": agent_a}) builder.set_entry_point(agent_a) graph = builder.build() # Create a GraphFlow team with the directed graph. team = GraphFlow( participants=[agent_a, agent_b, agent_c], graph=graph, termination_condition=MaxMessageTermination(20), # Max 20 messages to avoid infinite loop. ) # Run the team and print the events. async for event in team.run_stream(task="Write a short poem about AI Agents."): print(event) asyncio.run(main())
- component_config_schema#
别名为
GraphFlowConfig
- class MagenticOneGroupChat(participants: List[ChatAgent], model_client: ChatCompletionClient, *, termination_condition: TerminationCondition | None = None, max_turns: int | None = 20, runtime: AgentRuntime | None = None, max_stalls: int = 3, final_answer_prompt: str = ORCHESTRATOR_FINAL_ANSWER_PROMPT, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False)[source]#
基类:
BaseGroupChat
,Component
[MagenticOneGroupChatConfig
]一个团队,运行由 MagenticOneOrchestrator 管理参与者的群聊。
协调器处理对话流程,通过管理参与者的互动,确保高效地完成任务。
协调器基于 Magentic-One 架构,这是一种用于解决复杂任务的通用多智能体系统(参见下面的参考文献)。
- 参数:
participants (List[ChatAgent]) – 群聊中的参与者。
model_client (ChatCompletionClient) – 用于生成响应的模型客户端。
termination_condition (TerminationCondition, 可选) – 群聊的终止条件。 默认为 None。 如果没有终止条件,群聊将基于协调器逻辑运行,或者直到达到最大轮数。
max_turns (int, 可选) – 停止前的群聊最大轮数。 默认为 20。
max_stalls (int, 可选) – 重新规划前允许的最大停顿次数。 默认为 3。
final_answer_prompt (str, 可选) – 用于从团队的记录生成最终答案或响应的 LLM 提示。 提供了一个默认值(对 GPT-4o 类模型有意义)。
custom_message_types (List[type[BaseAgentEvent | BaseChatMessage]], 可选) – 将在群聊中使用的一系列自定义消息类型。 如果您正在使用自定义消息类型或您的智能体生成自定义消息类型,您需要在此处指定它们。 请确保您的自定义消息类型是
BaseAgentEvent
或BaseChatMessage
的子类。emit_team_events (bool, 可选) – 是否通过
BaseGroupChat.run_stream()
发出团队事件。 默认为 False。
- 引发:
ValueError – 如果编排逻辑中的进度分类帐没有所需的键,或者下一个发言者无效。
示例
具有一个助手智能体的 MagenticOneGroupChat
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import MagenticOneGroupChat from autogen_agentchat.ui import Console async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") assistant = AssistantAgent( "Assistant", model_client=model_client, ) team = MagenticOneGroupChat([assistant], model_client=model_client) await Console(team.run_stream(task="Provide a different proof to Fermat last theorem")) asyncio.run(main())
参考文献
如果您在您的工作中使用 MagenticOneGroupChat,请引用以下论文
@article{fourney2024magentic, title={Magentic-one: A generalist multi-agent system for solving complex tasks}, author={Fourney, Adam and Bansal, Gagan and Mozannar, Hussein and Tan, Cheng and Salinas, Eduardo and Niedtner, Friederike and Proebsting, Grace and Bassman, Griffin and Gerrits, Jack and Alber, Jacob and others}, journal={arXiv preprint arXiv:2411.04468}, year={2024} }
- classmethod _from_config(config: MagenticOneGroupChatConfig) Self [source]#
从配置对象创建一个组件的新实例。
- 参数:
config (T) – 配置对象。
- 返回值:
Self – 组件的新实例。
- component_config_schema#
别名为
MagenticOneGroupChatConfig
- class RoundRobinGroupChat(participants: List[ChatAgent], termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False)[source]#
基于:
BaseGroupChat
,Component
[RoundRobinGroupChatConfig
]一个团队,运行群聊,参与者轮流向所有人发布消息。
如果团队中只有一个参与者,则该参与者将是唯一的发言人。
- 参数:
participants (List[BaseChatAgent]) – 群聊中的参与者。
termination_condition (TerminationCondition, 可选) – 群聊的终止条件。 默认为 None。 如果没有终止条件,群聊将无限期运行。
max_turns (int, 可选) – 停止前群聊中的最大轮数。 默认为 None,表示没有限制。
custom_message_types (List[type[BaseAgentEvent | BaseChatMessage]], 可选) – 将在群聊中使用的一系列自定义消息类型。 如果您正在使用自定义消息类型或您的智能体生成自定义消息类型,您需要在此处指定它们。 请确保您的自定义消息类型是
BaseAgentEvent
或BaseChatMessage
的子类。emit_team_events (bool, 可选) – 是否通过
BaseGroupChat.run_stream()
发出团队事件。 默认为 False。
- 引发:
ValueError – 如果没有提供参与者,或者参与者名称不唯一。
示例
一个拥有工具的只有一个参与者的团队
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") async def get_weather(location: str) -> str: return f"The weather in {location} is sunny." assistant = AssistantAgent( "Assistant", model_client=model_client, tools=[get_weather], ) termination = TextMentionTermination("TERMINATE") team = RoundRobinGroupChat([assistant], termination_condition=termination) await Console(team.run_stream(task="What's the weather in New York?")) asyncio.run(main())
一个拥有多个参与者的团队
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = TextMentionTermination("TERMINATE") team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) await Console(team.run_stream(task="Tell me some jokes.")) asyncio.run(main())
- classmethod _from_config(config: RoundRobinGroupChatConfig) Self [source]#
从配置对象创建一个组件的新实例。
- 参数:
config (T) – 配置对象。
- 返回值:
Self – 组件的新实例。
- component_config_schema#
RoundRobinGroupChatConfig
的别名
- class SelectorGroupChat(participants: List[ChatAgent], model_client: ChatCompletionClient, *, termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, selector_prompt: str = 'You are in a role play game. The following roles are available:\n{roles}.\nRead the following conversation. Then select the next role from {participants} to play. Only return the role.\n\n{history}\n\nRead the above conversation. Then select the next role from {participants} to play. Only return the role.\n', allow_repeated_speaker: bool = False, max_selector_attempts: int = 3, selector_func: Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], str | None] | Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[str | None]] | None = None, candidate_func: Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], List[str]] | Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[List[str]]] | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False, model_client_streaming: bool = False)[source]#
基于:
BaseGroupChat
,Component
[SelectorGroupChatConfig
]一个群聊团队,参与者轮流向所有人发布消息,使用 ChatCompletion 模型在每条消息后选择下一个发言人。
- 参数:
participants (List[ChatAgent]) – 群聊中的参与者,必须具有唯一的名称,并且至少有两个参与者。
model_client (ChatCompletionClient) – 用于选择下一个发言人的 ChatCompletion 模型客户端。
termination_condition (TerminationCondition, 可选) – 群聊的终止条件。 默认为 None。 如果没有终止条件,群聊将无限期运行。
max_turns (int, 可选) – 停止前群聊中的最大轮数。 默认为 None,表示没有限制。
selector_prompt (str, 可选) – 用于选择下一个发言人的提示模板。 可用字段:‘{roles}’、‘{participants}’ 和 ‘{history}’。{participants} 是候选人的名称。 格式为 [“<name1>”, “<name2>”, …]。{roles} 是候选代理的名称和描述的换行分隔列表。 每行的格式为:“<name> : <description>”。{history} 是会话历史记录,格式为名称和消息内容的双换行分隔。 每条消息的格式为:“<name> : <message content>”。
allow_repeated_speaker (bool, 可选) – 是否将前一个发言人包括在要选择的下一个发言人的候选人列表中。 默认为 False。 该模型仍然可以选择前一个发言人 – 如果发生这种情况,将记录一个警告。
max_selector_attempts (int, 可选) – 使用模型选择发言人的最大尝试次数。 默认为 3。 如果模型在最大尝试次数后未能选择发言人,如果可用,将使用前一个发言人,否则将使用第一个参与者。
selector_func (Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], str | None], Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[str | None]], 可选) – 一个自定义选择器函数,它接受会话历史记录并返回下一个发言人的姓名。 如果提供,此函数将用于覆盖模型以选择下一个发言人。 如果函数返回 None,将使用模型来选择下一个发言人。
candidate_func (Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], List[str]], Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[List[str]]], optional) – 一个自定义函数,它接收对话历史记录,并返回一个经过筛选的候选者列表,用于使用模型选择下一个发言者。如果函数返回一个空列表或 None,则 SelectorGroupChat 将引发一个 ValueError。只有在未设置 selector_func 时才使用此函数。如果设置了 allow_repeated_speaker,则将被忽略。
custom_message_types (List[type[BaseAgentEvent | BaseChatMessage]], 可选) – 将在群聊中使用的一系列自定义消息类型。 如果您正在使用自定义消息类型或您的智能体生成自定义消息类型,您需要在此处指定它们。 请确保您的自定义消息类型是
BaseAgentEvent
或BaseChatMessage
的子类。emit_team_events (bool, 可选) – 是否通过
BaseGroupChat.run_stream()
发出团队事件。 默认为 False。model_client_streaming (bool, optional) – 是否对模型客户端使用流式传输。(这对于像 QwQ 这样的推理模型很有用)。默认为 False。
- 引发:
ValueError – 如果参与者人数少于两人,或者选择器提示无效。
示例
一个拥有多个参与者的团队
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import SelectorGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") async def lookup_hotel(location: str) -> str: return f"Here are some hotels in {location}: hotel1, hotel2, hotel3." async def lookup_flight(origin: str, destination: str) -> str: return f"Here are some flights from {origin} to {destination}: flight1, flight2, flight3." async def book_trip() -> str: return "Your trip is booked!" travel_advisor = AssistantAgent( "Travel_Advisor", model_client, tools=[book_trip], description="Helps with travel planning.", ) hotel_agent = AssistantAgent( "Hotel_Agent", model_client, tools=[lookup_hotel], description="Helps with hotel booking.", ) flight_agent = AssistantAgent( "Flight_Agent", model_client, tools=[lookup_flight], description="Helps with flight booking.", ) termination = TextMentionTermination("TERMINATE") team = SelectorGroupChat( [travel_advisor, hotel_agent, flight_agent], model_client=model_client, termination_condition=termination, ) await Console(team.run_stream(task="Book a 3-day trip to new york.")) asyncio.run(main())
具有自定义选择器功能的团队
import asyncio from typing import Sequence from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import SelectorGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") def check_calculation(x: int, y: int, answer: int) -> str: if x + y == answer: return "Correct!" else: return "Incorrect!" agent1 = AssistantAgent( "Agent1", model_client, description="For calculation", system_message="Calculate the sum of two numbers", ) agent2 = AssistantAgent( "Agent2", model_client, tools=[check_calculation], description="For checking calculation", system_message="Check the answer and respond with 'Correct!' or 'Incorrect!'", ) def selector_func(messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> str | None: if len(messages) == 1 or messages[-1].to_text() == "Incorrect!": return "Agent1" if messages[-1].source == "Agent1": return "Agent2" return None termination = TextMentionTermination("Correct!") team = SelectorGroupChat( [agent1, agent2], model_client=model_client, selector_func=selector_func, termination_condition=termination, ) await Console(team.run_stream(task="What is 1 + 1?")) asyncio.run(main())
- classmethod _from_config(config: SelectorGroupChatConfig) Self [source]#
从配置对象创建一个组件的新实例。
- 参数:
config (T) – 配置对象。
- 返回值:
Self – 组件的新实例。
- component_config_schema#
的别名
SelectorGroupChatConfig
- class Swarm(participants: List[ChatAgent], termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False)[source]#
基类:
BaseGroupChat
,Component
[SwarmConfig
]一个群聊团队,仅根据移交消息选择下一个发言者。
参与者列表中的第一个参与者是初始发言者。下一个发言者是根据当前发言者发送的
HandoffMessage
消息选择的。如果未发送任何移交消息,则当前发言者继续作为发言者。- 参数:
participants (List[ChatAgent]) – 参与群聊的代理。列表中的第一个代理是初始发言者。
termination_condition (TerminationCondition, 可选) – 群聊的终止条件。 默认为 None。 如果没有终止条件,群聊将无限期运行。
max_turns (int, 可选) – 停止前群聊中的最大轮数。 默认为 None,表示没有限制。
custom_message_types (List[type[BaseAgentEvent | BaseChatMessage]], 可选) – 将在群聊中使用的一系列自定义消息类型。 如果您正在使用自定义消息类型或您的智能体生成自定义消息类型,您需要在此处指定它们。 请确保您的自定义消息类型是
BaseAgentEvent
或BaseChatMessage
的子类。emit_team_events (bool, 可选) – 是否通过
BaseGroupChat.run_stream()
发出团队事件。 默认为 False。
基本示例
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import Swarm from autogen_agentchat.conditions import MaxMessageTermination async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent( "Alice", model_client=model_client, handoffs=["Bob"], system_message="You are Alice and you only answer questions about yourself.", ) agent2 = AssistantAgent( "Bob", model_client=model_client, system_message="You are Bob and your birthday is on 1st January." ) termination = MaxMessageTermination(3) team = Swarm([agent1, agent2], termination_condition=termination) stream = team.run_stream(task="What is bob's birthday?") async for message in stream: print(message) asyncio.run(main())
使用
HandoffTermination
进行人机环路移交import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import Swarm from autogen_agentchat.conditions import HandoffTermination, MaxMessageTermination from autogen_agentchat.ui import Console from autogen_agentchat.messages import HandoffMessage async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent( "Alice", model_client=model_client, handoffs=["user"], system_message="You are Alice and you only answer questions about yourself, ask the user for help if needed.", ) termination = HandoffTermination(target="user") | MaxMessageTermination(3) team = Swarm([agent], termination_condition=termination) # Start the conversation. await Console(team.run_stream(task="What is bob's birthday?")) # Resume with user feedback. await Console( team.run_stream( task=HandoffMessage(source="user", target="Alice", content="Bob's birthday is on 1st January.") ) ) asyncio.run(main())
- classmethod _from_config(config: SwarmConfig) Swarm [source]#
从配置对象创建一个组件的新实例。
- 参数:
config (T) – 配置对象。
- 返回值:
Self – 组件的新实例。
- component_config_schema#
的别名
SwarmConfig