使用 Agent 提取结果#

在运行多 Agent 系统来解决某些任务时,您可能希望在系统达到终止状态后提取结果。本指南展示了一种实现此目的的方法。鉴于 Agent 实例无法从外部直接访问,我们将使用 Agent 将最终结果发布到可访问的位置。

如果您的系统被建模为发布某种 FinalResult 类型,那么您可以创建一个 Agent,其唯一工作是订阅此类型并使其在外部可用。对于像这样简单的 Agent,ClosureAgent 是减少样板代码的一种选择。这允许您定义一个将作为 Agent 的消息处理程序的函数。在此示例中,我们将使用 Agent 和外部代码之间共享的队列来传递结果。

注意

在考虑如何从多 Agent 系统中提取结果时,您必须始终考虑 Agent 的订阅以及它们发布到的主题。这是因为 Agent 只会收到来自其订阅主题的消息。

import asyncio
from dataclasses import dataclass

from autogen_core import (
    ClosureAgent,
    ClosureContext,
    DefaultSubscription,
    DefaultTopicId,
    MessageContext,
    SingleThreadedAgentRuntime,
)

为最终结果定义一个数据类。

@dataclass
class FinalResult:
    value: str

创建一个队列,用于将结果从 Agent 传递到外部代码。

queue = asyncio.Queue[FinalResult]()

为将最终结果输出到队列创建一个函数闭包。该函数必须遵循签名 Callable[[AgentRuntime, AgentId, T, MessageContext], Awaitable[Any]] 其中 T 是 Agent 将接收的消息的类型。您可以使用联合类型来处理多种消息类型。

async def output_result(_agent: ClosureContext, message: FinalResult, ctx: MessageContext) -> None:
    await queue.put(message)

让我们创建一个运行时并注册一个 ClosureAgent,它将把最终结果发布到队列。

runtime = SingleThreadedAgentRuntime()
await ClosureAgent.register_closure(
    runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()]
)
AgentType(type='output_result')

我们可以通过将最终结果直接发布到运行时来模拟最终结果的收集。

runtime.start()
await runtime.publish_message(FinalResult("Result 1"), DefaultTopicId())
await runtime.publish_message(FinalResult("Result 2"), DefaultTopicId())
await runtime.stop_when_idle()

我们可以查看队列以查看最终结果。

while not queue.empty():
    print((result := await queue.get()).value)
Result 1
Result 2