人机协作#

在前一节 团队 中,我们已经了解了如何创建、观察和控制一个代理团队。 本节将重点介绍如何从您的应用程序与团队交互,并向团队提供人为反馈。

从您的应用程序与团队交互主要有两种方式

  1. 在团队运行时 – 执行 run()run_stream(),通过 UserProxyAgent 提供反馈。

  2. 一旦运行终止,通过输入下一个对 run()run_stream() 的调用来提供反馈。

我们将在本节中介绍这两种方法。

要直接跳到与 Web 和 UI 框架集成的代码示例,请参阅以下链接

在运行期间提供反馈#

UserProxyAgent 是一个特殊的内置代理,充当用户代理,向团队提供反馈。

要使用 UserProxyAgent,您可以创建它的一个实例,并在运行团队之前将其包含在团队中。 团队将决定何时调用 UserProxyAgent 来征求用户的反馈。

例如,在 RoundRobinGroupChat 团队中,UserProxyAgent 按照其传递给团队的顺序被调用,而在 SelectorGroupChat 团队中,选择器提示或选择器函数决定何时调用 UserProxyAgent

下图说明了如何在团队运行时使用 UserProxyAgent 从用户获取反馈

human-in-the-loop-user-proxy

粗体箭头表示团队运行期间的控制流:当团队调用 UserProxyAgent 时,它将控制权转移给应用程序/用户,并等待反馈; 一旦提供反馈,控制权将转移回团队,团队将继续执行。

注意

当在运行期间调用 UserProxyAgent 时,它会阻止团队的执行,直到用户提供反馈或出错为止。 这将阻碍团队的进度,并使团队处于无法保存或恢复的不稳定状态。

由于这种方法的阻塞性质,建议仅将其用于需要用户立即反馈的短时间交互,例如要求通过单击按钮批准或不批准,或者需要立即注意否则会失败的任务的警报。

以下是如何在 RoundRobinGroupChat 中使用 UserProxyAgent 进行诗歌生成任务的示例

from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Create the agents.
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
assistant = AssistantAgent("assistant", model_client=model_client)
user_proxy = UserProxyAgent("user_proxy", input_func=input)  # Use input() to get user input from console.

# Create the termination condition which will end the conversation when the user says "APPROVE".
termination = TextMentionTermination("APPROVE")

# Create the team.
team = RoundRobinGroupChat([assistant, user_proxy], termination_condition=termination)

# Run the conversation and stream to the console.
stream = team.run_stream(task="Write a 4-line poem about the ocean.")
# Use asyncio.run(...) when running in a script.
await Console(stream)
await model_client.close()
---------- user ----------
Write a 4-line poem about the ocean.
---------- assistant ----------
In endless blue where whispers play,  
The ocean's waves dance night and day.  
A world of depths, both calm and wild,  
Nature's heart, forever beguiled.  
TERMINATE
---------- user_proxy ----------
APPROVE
TaskResult(messages=[TextMessage(source='user', models_usage=None, metadata={}, content='Write a 4-line poem about the ocean.', type='TextMessage'), TextMessage(source='assistant', models_usage=RequestUsage(prompt_tokens=46, completion_tokens=43), metadata={}, content="In endless blue where whispers play,  \nThe ocean's waves dance night and day.  \nA world of depths, both calm and wild,  \nNature's heart, forever beguiled.  \nTERMINATE", type='TextMessage'), UserInputRequestedEvent(source='user_proxy', models_usage=None, metadata={}, request_id='2622a0aa-b776-4e54-9e8f-4ecbdf14b78d', content='', type='UserInputRequestedEvent'), TextMessage(source='user_proxy', models_usage=None, metadata={}, content='APPROVE', type='TextMessage')], stop_reason="Text 'APPROVE' mentioned")

从控制台输出中,您可以看到该团队通过 user_proxy 征求用户的反馈以批准生成的诗歌。

您可以为 UserProxyAgent 提供您自己的输入函数,以自定义反馈流程。 例如,当团队作为 Web 服务运行时,您可以使用自定义输入函数来等待来自 Web 套接字连接的消息。 以下代码段显示了使用 FastAPI Web 框架时的自定义输入函数示例

@app.websocket("/ws/chat")
async def chat(websocket: WebSocket):
    await websocket.accept()

    async def _user_input(prompt: str, cancellation_token: CancellationToken | None) -> str:
        data = await websocket.receive_json() # Wait for user message from websocket.
        message = TextMessage.model_validate(data) # Assume user message is a TextMessage.
        return message.content
    
    # Create user proxy with custom input function
    # Run the team with the user proxy
    # ...

有关完整的示例,请参见 AgentChat FastAPI 示例

有关 ChainLitUserProxyAgent 集成的示例,请参见 AgentChat ChainLit 示例

为下一次运行提供反馈#

通常,应用程序或用户以交互循环的方式与代理团队进行交互:团队运行到终止,应用程序或用户提供反馈,然后团队再次运行并提供反馈。

这种方法在团队和应用程序/用户之间进行异步通信的持久会话中很有用:一旦团队完成运行,应用程序将保存团队的状态,将其放入持久性存储中,并在收到反馈后恢复团队。

注意

有关如何保存和加载团队的状态的信息,请参阅 管理状态。 本节将重点介绍反馈机制。

下图说明了此方法中的控制流

human-in-the-loop-termination

有两种方法可以实现这种方法

  • 设置最大轮数,以便团队始终在指定的轮数后停止。

  • 使用诸如 TextMentionTerminationHandoffTermination 之类的终止条件,以允许团队根据团队的内部状态决定何时停止并将控制权交还。

您可以将这两种方法结合使用,以实现所需的行为。

使用最大轮数#

此方法允许您通过设置最大轮数来暂停团队以供用户输入。 例如,您可以通过将 max_turns 设置为 1 来配置团队在第一个代理响应后停止。 这在需要持续用户参与的情况下特别有用,例如在聊天机器人中。

要实现此目的,请在 RoundRobinGroupChat() 构造函数中设置 max_turns 参数。

team = RoundRobinGroupChat([...], max_turns=1)

团队停止后,轮数将重置。 当您恢复团队时,它将从 0 重新开始。 但是,团队的内部状态将被保留,例如,RoundRobinGroupChat 将从列表中的下一个代理处恢复,并具有相同的对话历史记录。

注意

max_turn 专门用于团队类,目前仅受 RoundRobinGroupChatSelectorGroupChatSwarm 支持。当与终止条件一起使用时,团队将在满足任一条件时停止。

以下是如何在 RoundRobinGroupChat 中使用 max_turns 来完成诗歌生成任务(最多 1 轮)的示例

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Create the agents.
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
assistant = AssistantAgent("assistant", model_client=model_client)

# Create the team setting a maximum number of turns to 1.
team = RoundRobinGroupChat([assistant], max_turns=1)

task = "Write a 4-line poem about the ocean."
while True:
    # Run the conversation and stream to the console.
    stream = team.run_stream(task=task)
    # Use asyncio.run(...) when running in a script.
    await Console(stream)
    # Get the user response.
    task = input("Enter your feedback (type 'exit' to leave): ")
    if task.lower().strip() == "exit":
        break
await model_client.close()
---------- user ----------
Write a 4-line poem about the ocean.
---------- assistant ----------
Endless waves in a dance with the shore,  
Whispers of secrets in tales from the roar,  
Beneath the vast sky, where horizons blend,  
The ocean’s embrace is a timeless friend.  
TERMINATE
[Prompt tokens: 46, Completion tokens: 48]
---------- Summary ----------
Number of messages: 2
Finish reason: Maximum number of turns 1 reached.
Total prompt tokens: 46
Total completion tokens: 48
Duration: 1.63 seconds
---------- user ----------
Can you make it about a person and its relationship with the ocean
---------- assistant ----------
She walks along the tide, where dreams intertwine,  
With every crashing wave, her heart feels aligned,  
In the ocean's embrace, her worries dissolve,  
A symphony of solace, where her spirit evolves.  
TERMINATE
[Prompt tokens: 117, Completion tokens: 49]
---------- Summary ----------
Number of messages: 2
Finish reason: Maximum number of turns 1 reached.
Total prompt tokens: 117
Total completion tokens: 49
Duration: 1.21 seconds

您可以看到,团队在一个代理响应后立即停止。

使用终止条件#

在前面的章节中,我们已经看到了几个终止条件的示例。在本节中,我们将重点介绍 HandoffTermination,它会在代理发送 HandoffMessage 消息时停止团队。

让我们创建一个团队,其中包含一个具有移交设置的 AssistantAgent 代理,并运行该团队,执行一项需要用户额外输入的任务,因为该代理没有继续处理该任务的相关工具。

注意

AssistantAgent 一起使用的模型必须支持工具调用才能使用移交功能。

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.base import Handoff
from autogen_agentchat.conditions import HandoffTermination, TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Create an OpenAI model client.
model_client = OpenAIChatCompletionClient(
    model="gpt-4o",
    # api_key="sk-...", # Optional if you have an OPENAI_API_KEY env variable set.
)

# Create a lazy assistant agent that always hands off to the user.
lazy_agent = AssistantAgent(
    "lazy_assistant",
    model_client=model_client,
    handoffs=[Handoff(target="user", message="Transfer to user.")],
    system_message="If you cannot complete the task, transfer to user. Otherwise, when finished, respond with 'TERMINATE'.",
)

# Define a termination condition that checks for handoff messages.
handoff_termination = HandoffTermination(target="user")
# Define a termination condition that checks for a specific text mention.
text_termination = TextMentionTermination("TERMINATE")

# Create a single-agent team with the lazy assistant and both termination conditions.
lazy_agent_team = RoundRobinGroupChat([lazy_agent], termination_condition=handoff_termination | text_termination)

# Run the team and stream to the console.
task = "What is the weather in New York?"
await Console(lazy_agent_team.run_stream(task=task), output_stats=True)
---------- user ----------
What is the weather in New York?
---------- lazy_assistant ----------
[FunctionCall(id='call_EAcMgrLGHdLw0e7iJGoMgxuu', arguments='{}', name='transfer_to_user')]
[Prompt tokens: 69, Completion tokens: 12]
---------- lazy_assistant ----------
[FunctionExecutionResult(content='Transfer to user.', call_id='call_EAcMgrLGHdLw0e7iJGoMgxuu')]
---------- lazy_assistant ----------
Transfer to user.
---------- Summary ----------
Number of messages: 4
Finish reason: Handoff to user from lazy_assistant detected.
Total prompt tokens: 69
Total completion tokens: 12
Duration: 0.69 seconds
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='What is the weather in New York?', type='TextMessage'), ToolCallRequestEvent(source='lazy_assistant', models_usage=RequestUsage(prompt_tokens=69, completion_tokens=12), content=[FunctionCall(id='call_EAcMgrLGHdLw0e7iJGoMgxuu', arguments='{}', name='transfer_to_user')], type='ToolCallRequestEvent'), ToolCallExecutionEvent(source='lazy_assistant', models_usage=None, content=[FunctionExecutionResult(content='Transfer to user.', call_id='call_EAcMgrLGHdLw0e7iJGoMgxuu')], type='ToolCallExecutionEvent'), HandoffMessage(source='lazy_assistant', models_usage=None, target='user', content='Transfer to user.', context=[], type='HandoffMessage')], stop_reason='Handoff to user from lazy_assistant detected.')

您可以看到,由于检测到移交消息,团队已停止。 让我们通过提供代理需要的信息来继续团队。

await Console(lazy_agent_team.run_stream(task="The weather in New York is sunny."))
---------- user ----------
The weather in New York is sunny.
---------- lazy_assistant ----------
Great! Enjoy the sunny weather in New York! Is there anything else you'd like to know?
---------- lazy_assistant ----------
TERMINATE
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='The weather in New York is sunny.', type='TextMessage'), TextMessage(source='lazy_assistant', models_usage=RequestUsage(prompt_tokens=110, completion_tokens=21), content="Great! Enjoy the sunny weather in New York! Is there anything else you'd like to know?", type='TextMessage'), TextMessage(source='lazy_assistant', models_usage=RequestUsage(prompt_tokens=137, completion_tokens=5), content='TERMINATE', type='TextMessage')], stop_reason="Text 'TERMINATE' mentioned")

您可以看到,在用户提供信息后,团队继续运行。

注意

如果您正在使用针对用户的带有 HandoffTerminationSwarm 团队,要恢复团队,您需要将 task 设置为 HandoffMessage,并将 target 设置为您要运行的下一个代理。 有关更多详细信息,请参见 Swarm