使用干预处理程序终止#

注意

此方法在使用 SingleThreadedAgentRuntime 时有效。

autogen_core 中有许多不同的方法来处理终止。最终目标是检测运行时不再需要执行,并且可以继续进行最终化任务。其中一种方法是使用 autogen_core.base.intervention.InterventionHandler 来检测终止消息,然后对其采取行动。

from dataclasses import dataclass
from typing import Any

from autogen_core import (
    DefaultInterventionHandler,
    DefaultTopicId,
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    default_subscription,
    message_handler,
)

首先,我们为常规消息和将用于信号终止的消息定义一个数据类。

@dataclass
class Message:
    content: Any


@dataclass
class Termination:
    reason: str

我们将代理编码为在决定是时候终止时发布终止消息。

@default_subscription
class AnAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("MyAgent")
        self.received = 0

    @message_handler
    async def on_new_message(self, message: Message, ctx: MessageContext) -> None:
        self.received += 1
        if self.received > 3:
            await self.publish_message(Termination(reason="Reached maximum number of messages"), DefaultTopicId())

接下来,我们创建一个 InterventionHandler,它将检测终止消息并对其采取行动。它会钩入发布,当它遇到 Termination 时,它会改变其内部状态以指示已请求终止。

class TerminationHandler(DefaultInterventionHandler):
    def __init__(self) -> None:
        self._termination_value: Termination | None = None

    async def on_publish(self, message: Any, *, message_context: MessageContext) -> Any:
        if isinstance(message, Termination):
            self._termination_value = message
        return message

    @property
    def termination_value(self) -> Termination | None:
        return self._termination_value

    @property
    def has_terminated(self) -> bool:
        return self._termination_value is not None

最后,我们将此处理程序添加到运行时并使用它来检测终止并在收到终止消息时停止运行时。

termination_handler = TerminationHandler()
runtime = SingleThreadedAgentRuntime(intervention_handlers=[termination_handler])

await AnAgent.register(runtime, "my_agent", AnAgent)

runtime.start()

# Publish more than 3 messages to trigger termination.
await runtime.publish_message(Message("hello"), DefaultTopicId())
await runtime.publish_message(Message("hello"), DefaultTopicId())
await runtime.publish_message(Message("hello"), DefaultTopicId())
await runtime.publish_message(Message("hello"), DefaultTopicId())

# Wait for termination.
await runtime.stop_when(lambda: termination_handler.has_terminated)

print(termination_handler.termination_value)
Termination(reason='Reached maximum number of messages')