autogen_core#
- class Agent(*args, **kwargs)[source]#
基类:
Protocol- property metadata: AgentMetadata#
代理的元数据。
- async bind_id_and_runtime(id: AgentId, runtime: AgentRuntime) None[source]#
用于将 Agent 实例绑定到 AgentRuntime 的函数。
- 参数:
agent_id (AgentId) – 代理的 ID。
runtime (AgentRuntime) – 要将代理绑定的 AgentRuntime 实例。
- async on_message(message: Any, ctx: MessageContext) Any[source]#
代理的消息处理程序。这应仅由运行时调用,而不是由其他代理调用。
- 参数:
message (Any) – 接收到的消息。类型是 subscriptions 中类型之一。
ctx (MessageContext) – 消息的上下文。
- 返回:
Any – 对消息的响应。可以是 None。
- 抛出:
CancelledError – 如果消息被取消。
CantHandleException – 如果代理无法处理该消息。
- class AgentId(type: str | AgentType, key: str)[source]#
基类:
objectAgent ID 在代理运行时(包括分布式运行时)中唯一标识一个代理实例。它是代理实例接收消息的“地址”。
更多信息请参见: Agent Identity and Lifecycle
- class AgentProxy(agent: AgentId, runtime: AgentRuntime)[source]#
基类:
object一个辅助类,允许您使用
AgentId代替其关联的Agent。- property metadata: Awaitable[AgentMetadata]#
代理的元数据。
- class AgentRuntime(*args, **kwargs)[source]#
基类:
Protocol- async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[source]#
向代理发送消息并获取响应。
- 参数:
message (Any) – 要发送的消息。
recipient (AgentId) – 要将消息发送到的代理。
sender (AgentId | None, optional) – 发送消息的代理。除非是从外部直接发送到运行时,否则 **仅** 应为 None。默认为 None。
cancellation_token (CancellationToken | None, optional) – 用于取消正在进行的 . 的令牌。默认为 None。
- 抛出:
CantHandleException – 如果收件人无法处理该消息。
UndeliverableException – 如果消息无法传递。
Other – 收到者引发的任何其他异常。
- 返回:
Any – 来自代理的响应。
- async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None[source]#
将消息发布到给定命名空间中的所有代理,或者如果未提供命名空间,则发布到发送者的命名空间。
发布消息不期望有响应。
- 参数:
message (Any) – 要发布的消息。
topic_id (TopicId) – 要将消息发布到的主题。
sender (AgentId | None, optional) – 发送消息的代理。默认为 None。
cancellation_token (CancellationToken | None, optional) – 用于取消正在进行的 . 的令牌。默认为 None。
message_id (str | None, optional) – 消息 ID。如果为 None,将生成新的消息 ID。默认为 None。此消息 ID 必须是唯一的。建议使用 UUID。
- 抛出:
UndeliverableException – 如果消息无法传递。
- async register_factory(type: str | AgentType, agent_factory: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None) AgentType[source]#
使用运行时和特定类型注册代理工厂。类型必须是唯一的。此 API 不会添加任何订阅。
注意
这是一个底层 API,通常应使用代理类的 register 方法,因为它也会自动处理订阅。
示例
from dataclasses import dataclass from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event from autogen_core.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def my_agent_factory(): return MyAgent() async def main() -> None: runtime: AgentRuntime = ... # type: ignore await runtime.register_factory("my_agent", lambda: MyAgent()) import asyncio asyncio.run(main())
- async register_agent_instance(agent_instance: Agent, agent_id: AgentId) AgentId[source]#
使用运行时注册代理实例。类型可以被重用,但每个 agent_id 必须是唯一的。类型内的所有代理实例必须是同一对象类型。此 API 不会添加任何订阅。
注意
这是一个底层 API,通常应使用代理类的 register_instance 方法,因为它也会自动处理订阅。
示例
from dataclasses import dataclass from autogen_core import AgentId, AgentRuntime, MessageContext, RoutedAgent, event from autogen_core.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def main() -> None: runtime: AgentRuntime = ... # type: ignore agent = MyAgent() await runtime.register_agent_instance( agent_instance=agent, agent_id=AgentId(type="my_agent", key="default") ) import asyncio asyncio.run(main())
- async try_get_underlying_agent_instance(id: AgentId, type: Type[T] = Agent) T[source]#
尝试按名称和命名空间获取底层代理实例。这通常不被推荐(因此名称很长),但在某些情况下可能有用。
如果底层代理无法访问,这将引发异常。
- 参数:
id (AgentId) – 代理 ID。
type (Type[T], optional) – 代理的预期类型。默认为 Agent。
- 返回:
T – 具体代理实例。
- 抛出:
LookupError – 如果找不到代理。
NotAccessibleError – 如果代理无法访问,例如它位于远程。
TypeError – 如果代理不是预期的类型。
- async get(id: AgentId, /, *, lazy: bool = True) AgentId[source]#
- async get(type: AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId
- async save_state() Mapping[str, Any][source]#
保存整个运行时的状态,包括所有托管的代理。恢复状态的唯一方法是将其传递给
load_state()。状态的结构由实现定义,可以是任何 JSON 可序列化的对象。
- 返回:
Mapping[str, Any] – 已保存的状态。
- async load_state(state: Mapping[str, Any]) None[source]#
加载整个运行时的状态,包括所有托管的代理。状态应与
save_state()返回的状态相同。- 参数:
state (Mapping[str, Any]) – 已保存的状态。
- async agent_metadata(agent: AgentId) AgentMetadata[source]#
获取代理的元数据。
- 参数:
agent (AgentId) – 代理 ID。
- 返回:
AgentMetadata – 代理元数据。
- async agent_save_state(agent: AgentId) Mapping[str, Any][source]#
保存单个代理的状态。
状态的结构由实现定义,可以是任何 JSON 可序列化的对象。
- 参数:
agent (AgentId) – 代理 ID。
- 返回:
Mapping[str, Any] – 已保存的状态。
- async add_subscription(subscription: Subscription) None[source]#
添加一个运行时应在处理已发布消息时满足的新订阅
- 参数:
subscription (Subscription) – 要添加的订阅
- async remove_subscription(id: str) None[source]#
从运行时中删除订阅
- 参数:
id (str) – 要删除的订阅的 ID
- 抛出:
LookupError – 如果订阅不存在
- add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None[source]#
向运行时添加新的消息序列化器
注意:这将根据 type_name 和 data_content_type 属性去重序列化器
- 参数:
serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – 要添加的序列化器/s
- class BaseAgent(description: str)[source]#
-
- property metadata: AgentMetadata#
代理的元数据。
- async bind_id_and_runtime(id: AgentId, runtime: AgentRuntime) None[source]#
用于将 Agent 实例绑定到 AgentRuntime 的函数。
- 参数:
agent_id (AgentId) – 代理的 ID。
runtime (AgentRuntime) – 要将代理绑定的 AgentRuntime 实例。
- property runtime: AgentRuntime#
- final async on_message(message: Any, ctx: MessageContext) Any[source]#
代理的消息处理程序。这应仅由运行时调用,而不是由其他代理调用。
- 参数:
message (Any) – 接收到的消息。类型是 subscriptions 中类型之一。
ctx (MessageContext) – 消息的上下文。
- 返回:
Any – 对消息的响应。可以是 None。
- 抛出:
CancelledError – 如果消息被取消。
CantHandleException – 如果代理无法处理该消息。
- abstract async on_message_impl(message: Any, ctx: MessageContext) Any[source]#
- async send_message(message: Any, recipient: AgentId, *, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[source]#
- async publish_message(message: Any, topic_id: TopicId, *, cancellation_token: CancellationToken | None = None) None[source]#
- async load_state(state: Mapping[str, Any]) None[source]#
加载来自 save_state 的代理状态。
- 参数:
state (Mapping[str, Any]) – 代理的状态。必须是 JSON 可序列化的。
- class CacheStore[source]#
Bases:
ABC,Generic[T],ComponentBase[BaseModel]此协议定义了存储/缓存操作的基本接口。
子类应处理底层存储的生命周期。
- component_type: ClassVar[ComponentType] = 'cache_store'#
组件的逻辑类型。
- class InMemoryStore[source]#
Bases:
CacheStore[T],Component[InMemoryStoreConfig]- component_provider_override: ClassVar[str | None] = 'autogen_core.InMemoryStore'#
覆盖组件的提供者字符串。这应该用于防止内部模块名称成为模块名称的一部分。
- component_config_schema#
alias of
InMemoryStoreConfig
- class AgentInstantiationContext[source]#
基类:
object一个静态类,提供代理实例化的上下文。
此静态类可用于在代理实例化过程中——在工厂函数或代理的类构造函数内部——访问当前的运行时和代理 ID。
示例
在工厂函数和代理的构造函数中获取当前的运行时和代理 ID。
import asyncio from dataclasses import dataclass from autogen_core import ( AgentId, AgentInstantiationContext, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler, ) @dataclass class TestMessage: content: str class TestAgent(RoutedAgent): def __init__(self, description: str): super().__init__(description) # Get the current runtime -- we don't use it here, but it's available. _ = AgentInstantiationContext.current_runtime() # Get the current agent ID. agent_id = AgentInstantiationContext.current_agent_id() print(f"Current AgentID from constructor: {agent_id}") @message_handler async def handle_test_message(self, message: TestMessage, ctx: MessageContext) -> None: print(f"Received message: {message.content}") def test_agent_factory() -> TestAgent: # Get the current runtime -- we don't use it here, but it's available. _ = AgentInstantiationContext.current_runtime() # Get the current agent ID. agent_id = AgentInstantiationContext.current_agent_id() print(f"Current AgentID from factory: {agent_id}") return TestAgent(description="Test agent") async def main() -> None: # Create a SingleThreadedAgentRuntime instance. runtime = SingleThreadedAgentRuntime() # Start the runtime. runtime.start() # Register the agent type with a factory function. await runtime.register_factory("test_agent", test_agent_factory) # Send a message to the agent. The runtime will instantiate the agent and call the message handler. await runtime.send_message(TestMessage(content="Hello, world!"), AgentId("test_agent", "default")) # Stop the runtime. await runtime.stop() asyncio.run(main())
- classmethod current_runtime() AgentRuntime[source]#
- class TopicId(type: str, source: str)[source]#
基类:
objectTopicId 定义了广播消息的范围。本质上,代理运行时通过其广播 API 实现发布-订阅模型:发布消息时,必须指定主题。
更多信息请参见:主题
- type: str#
此 topic_id 所包含的事件类型。遵循 CloudEvents 规范。
必须匹配模式:^[w-.:=]+Z
在此处了解更多信息:cloudevents/spec
- source: str#
标识事件发生的上下文。遵循 CloudEvents 规范。
在此处了解更多信息:cloudevents/spec
- class Subscription(*args, **kwargs)[source]#
基类:
ProtocolSubscriptions 定义了代理感兴趣的主题。
- class MessageContext(sender: AgentId | None, topic_id: TopicId | None, is_rpc: bool, cancellation_token: CancellationToken, message_id: str)[source]#
基类:
object- cancellation_token: CancellationToken#
- class Image(image: Image)[source]#
基类:
object表示图像。
示例
从 URL 加载图像。
from autogen_core import Image from PIL import Image as PILImage import aiohttp import asyncio async def from_url(url: str) -> Image: async with aiohttp.ClientSession() as session: async with session.get(url) as response: content = await response.read() return Image.from_pil(PILImage.open(content)) image = asyncio.run(from_url("https://example.com/image"))
- class RoutedAgent(description: str)[source]#
Bases:
BaseAgent基于消息类型和可选匹配函数的代理基类,用于将消息路由到处理程序。
要创建路由代理,请继承此类并使用
event()或rpc()装饰器将消息处理程序添加为方法。示例
from dataclasses import dataclass from autogen_core import MessageContext from autogen_core import RoutedAgent, event, rpc @dataclass class Message: pass @dataclass class MessageWithContent: content: str @dataclass class Response: pass class MyAgent(RoutedAgent): def __init__(self): super().__init__("MyAgent") @event async def handle_event_message(self, message: Message, ctx: MessageContext) -> None: assert ctx.topic_id is not None await self.publish_message(MessageWithContent("event handled"), ctx.topic_id) @rpc(match=lambda message, ctx: message.content == "special") # type: ignore async def handle_special_rpc_message(self, message: MessageWithContent, ctx: MessageContext) -> Response: return Response()
- async on_message_impl(message: Any, ctx: MessageContext) Any | None[source]#
通过将消息路由到适当的消息处理程序来处理消息。请不要在子类中覆盖此方法。而是将消息处理程序添加为使用
event()或rpc()装饰器修饰的方法。
- async on_unhandled_message(message: Any, ctx: MessageContext) None[source]#
收到一条没有匹配的消息处理程序的收到的消息时调用。默认实现记录一条信息消息。
- class ClosureAgent(description: str, closure: Callable[[ClosureContext, T, MessageContext], Awaitable[Any]], *, unknown_type_policy: Literal['error', 'warn', 'ignore'] = 'warn')[source]#
Bases:
BaseAgent,ClosureContext- property metadata: AgentMetadata#
代理的元数据。
- property runtime: AgentRuntime#
- async on_message_impl(message: Any, ctx: MessageContext) Any[source]#
- async classmethod register_closure(runtime: AgentRuntime, type: str, closure: Callable[[ClosureContext, T, MessageContext], Awaitable[Any]], *, unknown_type_policy: Literal['error', 'warn', 'ignore'] = 'warn', skip_direct_message_subscription: bool = False, description: str = '', subscriptions: Callable[[], list[Subscription] | Awaitable[list[Subscription]]] | None = None) AgentType[source]#
闭包代理允许您使用闭包(函数)来定义代理,而无需定义类。它允许从运行时中提取值。
闭包可以定义预期的消息类型,或者可以使用 Any 来接受任何类型的消息。
示例
import asyncio from autogen_core import SingleThreadedAgentRuntime, MessageContext, ClosureAgent, ClosureContext from dataclasses import dataclass from autogen_core._default_subscription import DefaultSubscription from autogen_core._default_topic import DefaultTopicId @dataclass class MyMessage: content: str async def main(): queue = asyncio.Queue[MyMessage]() async def output_result(_ctx: ClosureContext, message: MyMessage, ctx: MessageContext) -> None: await queue.put(message) runtime = SingleThreadedAgentRuntime() await ClosureAgent.register_closure( runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()] ) runtime.start() await runtime.publish_message(MyMessage("Hello, world!"), DefaultTopicId()) await runtime.stop_when_idle() result = await queue.get() print(result) asyncio.run(main())
- 参数:
runtime (AgentRuntime) – 要注册代理的运行时。
type (str) – 注册代理的代理类型。
closure (Callable[[ClosureContext, T, MessageContext], Awaitable[Any]]) – 用于处理消息的闭包。
unknown_type_policy (Literal["error", "warn", "ignore"], optional) – 如果遇到与闭包类型不匹配的类型时该怎么办。默认为“warn”。
skip_direct_message_subscription (bool, optional) – 不要为此代理添加直接消息订阅。默认为 False。
description (str, optional) – 对代理功能的描述。默认为“”.
subscriptions (Callable[[], list[Subscription] | Awaitable[list[Subscription]]] | None, optional) – 此闭包代理的订阅列表。默认为 None。
- 返回:
AgentType – 已注册代理的类型。
- class ClosureContext(*args, **kwargs)[source]#
基类:
Protocol
- message_handler(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]]], MessageHandler[AgentT, ReceivesT, ProducesT]] | MessageHandler[AgentT, ReceivesT, ProducesT][source]#
泛型消息处理程序的装饰器。
将此装饰器添加到
RoutedAgent类中的方法,这些方法旨在处理事件消息和 RPC 消息。这些方法必须具有特定的签名,并且必须遵循该签名才能使其有效。该方法必须是 async 方法。
该方法必须用 @message_handler 装饰器进行装饰。
- 该方法必须有 3 个参数。
self
message:要处理的消息,必须用其预期处理的消息类型进行类型提示。
ctx:一个
autogen_core.MessageContext对象。
该方法必须用它作为响应可以返回的消息类型进行类型提示,或者如果它不返回任何内容,则可以返回 None。
处理程序可以通过接受消息类型的联合来处理多种消息类型。它也可以通过返回消息类型的联合来返回多种消息类型。
- 参数:
func – 要装饰的函数。
strict – 如果为 True,则在消息类型或返回类型不在目标类型中时,处理程序将引发异常。如果为 False,则会记录一条警告。
match – 一个函数,它将消息和上下文作为参数并返回一个布尔值。这用于在消息类型之后进行二次路由。对于处理相同消息类型的处理程序,匹配函数将按字母顺序应用处理程序,并且将调用第一个匹配的处理程序,而跳过其余的处理程序。如果为 None,则将调用第一个按字母顺序匹配相同消息类型的处理程序。
- event(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, None]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, None]]], MessageHandler[AgentT, ReceivesT, None]] | MessageHandler[AgentT, ReceivesT, None][source]#
事件消息处理器的装饰器。
将此装饰器添加到
RoutedAgent类中旨在处理事件消息的方法上。这些方法必须遵循特定的签名才能使其有效该方法必须是 async 方法。
该方法必须用 @message_handler 装饰器进行装饰。
- 该方法必须有 3 个参数。
self
message:要处理的事件消息,此消息必须进行类型提示,指示其要处理的消息类型。
ctx:一个
autogen_core.MessageContext对象。
该方法必须返回 None。
通过接受消息类型的联合,处理程序可以处理多种消息类型。
- 参数:
func – 要装饰的函数。
strict – 如果为 True,当消息类型不在目标类型中时,处理程序将引发异常。如果为 False,它将改为记录警告。
match – 一个函数,它将消息和上下文作为参数并返回一个布尔值。这用于在消息类型之后进行二次路由。对于处理相同消息类型的处理程序,匹配函数将按字母顺序应用处理程序,并且将调用第一个匹配的处理程序,而跳过其余的处理程序。如果为 None,则将调用第一个按字母顺序匹配相同消息类型的处理程序。
- rpc(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]]], MessageHandler[AgentT, ReceivesT, ProducesT]] | MessageHandler[AgentT, ReceivesT, ProducesT][source]#
RPC 消息处理器的装饰器。
将此装饰器添加到
RoutedAgent类中旨在处理 RPC 消息的方法上。这些方法必须遵循特定的签名才能使其有效该方法必须是 async 方法。
该方法必须用 @message_handler 装饰器进行装饰。
- 该方法必须有 3 个参数。
self
message:要处理的消息,必须用其预期处理的消息类型进行类型提示。
ctx:一个
autogen_core.MessageContext对象。
该方法必须用它作为响应可以返回的消息类型进行类型提示,或者如果它不返回任何内容,则可以返回 None。
处理程序可以通过接受消息类型的联合来处理多种消息类型。它也可以通过返回消息类型的联合来返回多种消息类型。
- 参数:
func – 要装饰的函数。
strict – 如果为 True,则在消息类型或返回类型不在目标类型中时,处理程序将引发异常。如果为 False,则会记录一条警告。
match – 一个函数,它将消息和上下文作为参数并返回一个布尔值。这用于在消息类型之后进行二次路由。对于处理相同消息类型的处理程序,匹配函数将按字母顺序应用处理程序,并且将调用第一个匹配的处理程序,而跳过其余的处理程序。如果为 None,则将调用第一个按字母顺序匹配相同消息类型的处理程序。
- class TypeSubscription(topic_type: str, agent_type: str | AgentType, id: str | None = None)[source]#
Bases:
Subscription此订阅根据主题类型匹配,并使用主题的源作为代理键映射到代理。
此订阅导致每个源都有自己的代理实例。
示例
from autogen_core import TypeSubscription subscription = TypeSubscription(topic_type="t1", agent_type="a1")
在这种情况下
类型为 t1、源为 s1 的 topic_id 将由键为 s1 的 a1 类型的代理处理。
类型为 t1、源为 s2 的 topic_id 将由键为 s2 的 a1 类型的代理处理。
- class DefaultSubscription(topic_type: str = 'default', agent_type: str | AgentType | None = None)[source]#
Bases:
TypeSubscription默认订阅旨在为仅需要代理全局范围的应用程序提供合理的默认值。
此主题默认使用“default”主题类型,并尝试根据实例化上下文检测要使用的代理类型。
- class DefaultTopicId(type: str = 'default', source: str | None = None)[source]#
Bases:
TopicIdDefaultTopicId 为 TopicId 的 topic_id 和 source 字段提供了合理的默认值。
如果在消息处理器的上下文中创建,则 source 将设置为消息处理器的 agent_id,否则将设置为“default”。
- default_subscription(cls: Type[BaseAgentType] | None = None) Callable[[Type[BaseAgentType]], Type[BaseAgentType]] | Type[BaseAgentType][source]#
- class TypePrefixSubscription(topic_type_prefix: str, agent_type: str | AgentType, id: str | None = None)[source]#
Bases:
Subscription此订阅根据类型前缀匹配主题,并使用主题的源作为代理键映射到代理。
此订阅导致每个源都有自己的代理实例。
示例
from autogen_core import TypePrefixSubscription subscription = TypePrefixSubscription(topic_type_prefix="t1", agent_type="a1")
在这种情况下
类型为 t1、源为 s1 的 topic_id 将由键为 s1 的 a1 类型的代理处理。
类型为 t1、源为 s2 的 topic_id 将由键为 s2 的 a1 类型的代理处理。
类型为 t1SUFFIX、源为 s2 的 topic_id 将由键为 s2 的 a1 类型的代理处理。
- JSON_DATA_CONTENT_TYPE = 'application/json'#
JSON 数据的 Content-Type。
- PROTOBUF_DATA_CONTENT_TYPE = 'application/x-protobuf'#
Protobuf 数据的 Content-Type。
- class SingleThreadedAgentRuntime(*, intervention_handlers: List[InterventionHandler] | None = None, tracer_provider: TracerProvider | None = None, ignore_unhandled_exceptions: bool = True)[source]#
Bases:
AgentRuntime单线程代理运行时,它使用单个 asyncio 队列处理所有消息。消息按接收顺序传递,并且运行时在单独的 asyncio 任务中并发处理每条消息。
注意
此运行时适用于开发和独立应用程序。不适用于高吞吐量或高并发场景。
- 参数:
intervention_handlers (List[InterventionHandler], optional) – 可以在消息发送或发布之前拦截消息的干预处理程序列表。默认为 None。
tracer_provider (TracerProvider, optional) – 用于跟踪的跟踪器提供程序。默认为 None。此外,您可以设置环境变量 AUTOGEN_DISABLE_RUNTIME_TRACING 为 true 以禁用代理运行时遥测,如果您没有访问运行时构造函数。例如,如果您使用的是 ComponentConfig。
ignore_unhandled_exceptions (bool, optional) – 是否忽略代理事件处理程序中发生的未处理异常。任何后台异常将在下一次调用 process_next 或从已等待的 stop、stop_when_idle 或 stop_when 中引发。请注意,这不适用于 RPC 处理程序。默认为 True。
示例
创建运行时、注册代理、发送消息并停止运行时的简单示例
import asyncio from dataclasses import dataclass from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): @message_handler async def handle_my_message(self, message: MyMessage, ctx: MessageContext) -> None: print(f"Received message: {message.content}") async def main() -> None: # Create a runtime and register the agent runtime = SingleThreadedAgentRuntime() await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My agent")) # Start the runtime, send a message and stop the runtime runtime.start() await runtime.send_message(MyMessage("Hello, world!"), recipient=AgentId("my_agent", "default")) await runtime.stop() asyncio.run(main())
创建运行时、注册代理、发布消息并停止运行时的示例
import asyncio from dataclasses import dataclass from autogen_core import ( DefaultTopicId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, default_subscription, message_handler, ) @dataclass class MyMessage: content: str # The agent is subscribed to the default topic. @default_subscription class MyAgent(RoutedAgent): @message_handler async def handle_my_message(self, message: MyMessage, ctx: MessageContext) -> None: print(f"Received message: {message.content}") async def main() -> None: # Create a runtime and register the agent runtime = SingleThreadedAgentRuntime() await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My agent")) # Start the runtime. runtime.start() # Publish a message to the default topic that the agent is subscribed to. await runtime.publish_message(MyMessage("Hello, world!"), DefaultTopicId()) # Wait for the message to be processed and then stop the runtime. await runtime.stop_when_idle() asyncio.run(main())
- async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[source]#
向代理发送消息并获取响应。
- 参数:
message (Any) – 要发送的消息。
recipient (AgentId) – 要将消息发送到的代理。
sender (AgentId | None, optional) – 发送消息的代理。除非是从外部直接发送到运行时,否则 **仅** 应为 None。默认为 None。
cancellation_token (CancellationToken | None, optional) – 用于取消正在进行的 . 的令牌。默认为 None。
- 抛出:
CantHandleException – 如果收件人无法处理该消息。
UndeliverableException – 如果消息无法传递。
Other – 收到者引发的任何其他异常。
- 返回:
Any – 来自代理的响应。
- async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None[source]#
将消息发布到给定命名空间中的所有代理,或者如果未提供命名空间,则发布到发送者的命名空间。
发布消息不期望有响应。
- 参数:
message (Any) – 要发布的消息。
topic_id (TopicId) – 要将消息发布到的主题。
sender (AgentId | None, optional) – 发送消息的代理。默认为 None。
cancellation_token (CancellationToken | None, optional) – 用于取消正在进行的 . 的令牌。默认为 None。
message_id (str | None, optional) – 消息 ID。如果为 None,将生成新的消息 ID。默认为 None。此消息 ID 必须是唯一的。建议使用 UUID。
- 抛出:
UndeliverableException – 如果消息无法传递。
- async save_state() Mapping[str, Any][source]#
保存所有实例化代理的状态。
此方法调用每个代理上的
save_state()方法,并返回一个将代理 ID 映射到其状态的字典。注意
此方法目前不保存订阅状态。我们将在未来添加此功能。
- 返回:
一个将代理 ID 映射到其状态的字典。
- async load_state(state: Mapping[str, Any]) None[source]#
加载所有实例化代理的状态。
此方法使用字典中提供的状态,在每个代理上调用
load_state()方法。字典的键是代理 ID,值是save_state()方法返回的状态字典。注意
此方法目前不加载订阅状态。我们将在未来添加此功能。
- async process_next() None[source]#
处理队列中的下一条消息。
如果在后台任务中发生未处理的异常,它将在此处引发。process_next 在引发未处理异常后无法再次调用。
- start() None[source]#
启动运行时消息处理循环。这将在后台任务中运行。
示例
import asyncio from autogen_core import SingleThreadedAgentRuntime async def main() -> None: runtime = SingleThreadedAgentRuntime() runtime.start() # ... do other things ... await runtime.stop() asyncio.run(main())
- async close() None[source]#
如果适用,则调用
stop()以及所有实例化代理上的Agent.close()方法
- async stop_when(condition: Callable[[], bool]) None[source]#
当满足条件时,停止运行时消息处理循环。
注意
不建议使用此方法,它仅用于旧版。它会启动一个忙碌的循环来不断检查条件。调用 stop_when_idle 或 stop 更有效。如果您需要根据条件停止运行时,请考虑使用后台任务和 asyncio.Event 来发出信号,指示何时满足条件以及后台任务应调用 stop。
- async agent_metadata(agent: AgentId) AgentMetadata[source]#
获取代理的元数据。
- 参数:
agent (AgentId) – 代理 ID。
- 返回:
AgentMetadata – 代理元数据。
- async agent_save_state(agent: AgentId) Mapping[str, Any][source]#
保存单个代理的状态。
状态的结构由实现定义,可以是任何 JSON 可序列化的对象。
- 参数:
agent (AgentId) – 代理 ID。
- 返回:
Mapping[str, Any] – 已保存的状态。
- async register_factory(type: str | AgentType, agent_factory: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None) AgentType[source]#
使用运行时和特定类型注册代理工厂。类型必须是唯一的。此 API 不会添加任何订阅。
注意
这是一个底层 API,通常应使用代理类的 register 方法,因为它也会自动处理订阅。
示例
from dataclasses import dataclass from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event from autogen_core.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def my_agent_factory(): return MyAgent() async def main() -> None: runtime: AgentRuntime = ... # type: ignore await runtime.register_factory("my_agent", lambda: MyAgent()) import asyncio asyncio.run(main())
- async register_agent_instance(agent_instance: Agent, agent_id: AgentId) AgentId[source]#
使用运行时注册代理实例。类型可以被重用,但每个 agent_id 必须是唯一的。类型内的所有代理实例必须是同一对象类型。此 API 不会添加任何订阅。
注意
这是一个底层 API,通常应使用代理类的 register_instance 方法,因为它也会自动处理订阅。
示例
from dataclasses import dataclass from autogen_core import AgentId, AgentRuntime, MessageContext, RoutedAgent, event from autogen_core.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def main() -> None: runtime: AgentRuntime = ... # type: ignore agent = MyAgent() await runtime.register_agent_instance( agent_instance=agent, agent_id=AgentId(type="my_agent", key="default") ) import asyncio asyncio.run(main())
- async try_get_underlying_agent_instance(id: AgentId, type: Type[T] = Agent) T[source]#
尝试按名称和命名空间获取底层代理实例。这通常不被推荐(因此名称很长),但在某些情况下可能有用。
如果底层代理无法访问,这将引发异常。
- 参数:
id (AgentId) – 代理 ID。
type (Type[T], optional) – 代理的预期类型。默认为 Agent。
- 返回:
T – 具体代理实例。
- 抛出:
LookupError – 如果找不到代理。
NotAccessibleError – 如果代理无法访问,例如它位于远程。
TypeError – 如果代理不是预期的类型。
- async add_subscription(subscription: Subscription) None[source]#
添加一个运行时应在处理已发布消息时满足的新订阅
- 参数:
subscription (Subscription) – 要添加的订阅
- async remove_subscription(id: str) None[source]#
从运行时中删除订阅
- 参数:
id (str) – 要删除的订阅的 ID
- 抛出:
LookupError – 如果订阅不存在
- async get(id_or_type: AgentId | AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId[source]#
- add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None[source]#
向运行时添加新的消息序列化器
注意:这将根据 type_name 和 data_content_type 属性去重序列化器
- 参数:
serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – 要添加的序列化器/s
- ROOT_LOGGER_NAME = 'autogen_core'#
根日志记录器的名称。
- EVENT_LOGGER_NAME = 'autogen_core.events'#
用于结构化事件的日志记录器的名称。
- TRACE_LOGGER_NAME = 'autogen_core.trace'#
用于开发人员预期跟踪日志记录的记录器名称。此日志的内容和格式不应被依赖。
- class Component[source]#
Bases:
ComponentFromConfig[ConfigT],ComponentSchemaType[ConfigT],Generic[ConfigT]要创建组件类,请继承此类以获取具体类,并继承 ComponentBase 以获取接口。然后实现两个类变量
component_config_schema- 代表组件配置的 Pydantic 模型类。这也是 Component 的类型参数。component_type- 组件的逻辑类型是什么。
示例
from __future__ import annotations from pydantic import BaseModel from autogen_core import Component class Config(BaseModel): value: str class MyComponent(Component[Config]): component_type = "custom" component_config_schema = Config def __init__(self, value: str): self.value = value def _to_config(self) -> Config: return Config(value=self.value) @classmethod def _from_config(cls, config: Config) -> MyComponent: return cls(value=config.value)
- class ComponentBase[source]#
Bases:
ComponentToConfig[ConfigT],ComponentLoader,Generic[ConfigT]
- class ComponentFromConfig[source]#
Bases:
Generic[FromConfigT]
- class ComponentLoader[source]#
基类:
object- classmethod load_component(model: ComponentModel | Dict[str, Any], expected: None = None) Self[source]#
- classmethod load_component(model: ComponentModel | Dict[str, Any], expected: Type[ExpectedType]) ExpectedType
从模型加载组件。 intended to be used with the return type of
autogen_core.ComponentConfig.dump_component()。示例
from autogen_core import ComponentModel from autogen_core.models import ChatCompletionClient component: ComponentModel = ... # type: ignore model_client = ChatCompletionClient.load_component(component)
- 参数:
model (ComponentModel) – 要从中加载组件的模型。
model – _description_
expected (Type[ExpectedType] | None, optional) – 仅当直接在 ComponentLoader 上使用时才需要显式类型。默认为 None。
- 返回:
Self – 加载的组件。
- 抛出:
ValueError – 如果 provider 字符串无效。
TypeError – Provider 不是 ComponentConfigImpl 的子类,或者预期的类型不匹配。
- 返回:
Self | ExpectedType – 加载的组件。
- pydantic model ComponentModel[source]#
基类:
BaseModel组件的模型类。包含实例化组件所需的所有信息。
显示 JSON 模式
{ "title": "ComponentModel", "description": "Model class for a component. Contains all information required to instantiate a component.", "type": "object", "properties": { "provider": { "title": "Provider", "type": "string" }, "component_type": { "anyOf": [ { "enum": [ "model", "agent", "tool", "termination", "token_provider", "workbench" ], "type": "string" }, { "type": "string" }, { "type": "null" } ], "default": null, "title": "Component Type" }, "version": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "title": "Version" }, "component_version": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "title": "Component Version" }, "description": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Description" }, "label": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Label" }, "config": { "title": "Config", "type": "object" } }, "required": [ "provider", "config" ] }
- 字段:
component_type (Literal['model', 'agent', 'tool', 'termination', 'token_provider', 'workbench'] | str | None)component_version (int | None)config (dict[str, Any])description (str | None)label (str | None)provider (str)version (int | None)
- class ComponentSchemaType[source]#
Bases:
Generic[ConfigT]- required_class_vars = ['component_config_schema', 'component_type']#
- class ComponentToConfig[source]#
Bases:
Generic[ToConfigT]一个类必须实现的两个方法才能成为组件。
- 参数:
Protocol (ConfigT) – 派生自
pydantic.BaseModel的类型。
- component_type: ClassVar[Literal['model', 'agent', 'tool', 'termination', 'token_provider', 'workbench'] | str]#
组件的逻辑类型。
- dump_component() ComponentModel[source]#
将组件转储为可以重新加载的模型。
- 抛出:
TypeError – 如果组件是本地类。
- 返回:
ComponentModel – 代表组件的模型。
- class InterventionHandler(*args, **kwargs)[source]#
基类:
Protocol干预处理程序是一个类,可用于修改、记录或丢弃由
autogen_core.base.AgentRuntime处理的消息。消息提交到运行时时会调用该处理程序。
目前唯一支持此功能的运行时是
autogen_core.base.SingleThreadedAgentRuntime。注意:从任何干预处理程序方法返回 None 将导致发出警告,并被视为“无更改”。如果您打算丢弃消息,则应显式返回
DropMessage。示例
from autogen_core import DefaultInterventionHandler, MessageContext, AgentId, SingleThreadedAgentRuntime from dataclasses import dataclass from typing import Any @dataclass class MyMessage: content: str class MyInterventionHandler(DefaultInterventionHandler): async def on_send(self, message: Any, *, message_context: MessageContext, recipient: AgentId) -> MyMessage: if isinstance(message, MyMessage): message.content = message.content.upper() return message runtime = SingleThreadedAgentRuntime(intervention_handlers=[MyInterventionHandler()])
- async on_send(message: Any, *, message_context: MessageContext, recipient: AgentId) Any | type[DropMessage][source]#
当通过
autogen_core.base.AgentRuntime.send_message()将消息提交给 AgentRuntime 时调用。
- async on_publish(message: Any, *, message_context: MessageContext) Any | type[DropMessage][source]#
当通过
autogen_core.base.AgentRuntime.publish_message()将消息发布到 AgentRuntime 时调用。
- class DefaultInterventionHandler(*args, **kwargs)[source]#
Bases:
InterventionHandler提供所有干预处理程序方法的默认实现的简单类,这些方法只是返回未更改的消息。允许轻松子类化以仅覆盖所需的方法。
- async on_send(message: Any, *, message_context: MessageContext, recipient: AgentId) Any | type[DropMessage][source]#
当通过
autogen_core.base.AgentRuntime.send_message()将消息提交给 AgentRuntime 时调用。
- async on_publish(message: Any, *, message_context: MessageContext) Any | type[DropMessage][source]#
当通过
autogen_core.base.AgentRuntime.publish_message()将消息发布到 AgentRuntime 时调用。
- trace_create_agent_span(agent_name: str, *, tracer: Tracer | None = None, parent: Span | None = None, agent_id: str | None = None, agent_description: str | None = None) Generator[Span, Any, None][source]#
用于创建代理的 span 的上下文管理器,遵循 OpenTelemetry 生成式 AI 系统语义约定。
请参阅 GenAI 语义约定文档: OpenTelemetry GenAI 语义约定
警告
GenAI 语义约定仍处于孵化阶段,可能会在后续版本中发生更改。
- trace_invoke_agent_span(agent_name: str, *, tracer: Tracer | None = None, parent: Span | None = None, agent_id: str | None = None, agent_description: str | None = None) Generator[Span, Any, None][source]#
用于调用代理的 span 的上下文管理器,遵循 OpenTelemetry 生成式 AI 系统语义约定。
请参阅 GenAI 语义约定文档: OpenTelemetry GenAI 语义约定
警告
GenAI 语义约定仍处于孵化阶段,可能会在后续版本中发生更改。
- trace_tool_span(tool_name: str, *, tracer: Tracer | None = None, parent: Span | None = None, tool_description: str | None = None, tool_call_id: str | None = None) Generator[Span, Any, None][source]#
用于工具执行的 span 的上下文管理器,遵循 OpenTelemetry 生成式 AI 系统语义约定。
请参阅 GenAI 语义约定文档: OpenTelemetry GenAI 语义约定
警告
GenAI 语义约定仍处于孵化阶段,可能会在后续版本中发生更改。