autogen_core#

class Agent(*args, **kwargs)[source]#

基类:Protocol

property metadata: AgentMetadata#

代理的元数据。

property id: AgentId#

代理的 ID。

async bind_id_and_runtime(id: AgentId, runtime: AgentRuntime) None[source]#

用于将 Agent 实例绑定到 AgentRuntime 的函数。

参数:
  • agent_id (AgentId) – 代理的 ID。

  • runtime (AgentRuntime) – 要将代理绑定的 AgentRuntime 实例。

async on_message(message: Any, ctx: MessageContext) Any[source]#

代理的消息处理程序。这应仅由运行时调用,而不是由其他代理调用。

参数:
  • message (Any) – 接收到的消息。类型是 subscriptions 中类型之一。

  • ctx (MessageContext) – 消息的上下文。

返回:

Any – 对消息的响应。可以是 None。

抛出:
async save_state() Mapping[str, Any][source]#

保存代理的状态。结果必须是 JSON 可序列化的。

async load_state(state: Mapping[str, Any]) None[source]#

加载来自 save_state 的代理状态。

参数:

state (Mapping[str, Any]) – 代理的状态。必须是 JSON 可序列化的。

async close() None[source]#

运行时关闭时调用

class AgentId(type: str | AgentType, key: str)[source]#

基类: object

Agent ID 在代理运行时(包括分布式运行时)中唯一标识一个代理实例。它是代理实例接收消息的“地址”。

更多信息请参见: Agent Identity and Lifecycle

classmethod from_str(agent_id: str) Self[source]#

将格式为 type/key 的字符串转换为 AgentId

property type: str#

标识符,用于将代理与特定的工厂函数关联。

字符串只能由字母数字字符 (a-z) 和 (0-9) 或下划线 (_) 组成。

property key: str#

代理实例标识符。

字符串只能由字母数字字符 (a-z) 和 (0-9) 或下划线 (_) 组成。

class AgentProxy(agent: AgentId, runtime: AgentRuntime)[source]#

基类: object

一个辅助类,允许您使用 AgentId 代替其关联的 Agent

property id: AgentId#

此代理的目标代理。

property metadata: Awaitable[AgentMetadata]#

代理的元数据。

async send_message(message: Any, *, sender: AgentId, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[source]#
async save_state() Mapping[str, Any][source]#

保存代理的状态。结果必须是 JSON 可序列化的。

async load_state(state: Mapping[str, Any]) None[source]#

加载来自 save_state 的代理状态。

参数:

state (Mapping[str, Any]) – 代理的状态。必须是 JSON 可序列化的。

class AgentMetadata[source]#

基类: TypedDict

type: str#
key: str#
description: str#
class AgentRuntime(*args, **kwargs)[source]#

基类:Protocol

async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[source]#

向代理发送消息并获取响应。

参数:
  • message (Any) – 要发送的消息。

  • recipient (AgentId) – 要将消息发送到的代理。

  • sender (AgentId | None, optional) – 发送消息的代理。除非是从外部直接发送到运行时,否则 **仅** 应为 None。默认为 None。

  • cancellation_token (CancellationToken | None, optional) – 用于取消正在进行的 . 的令牌。默认为 None。

抛出:
返回:

Any – 来自代理的响应。

async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None[source]#

将消息发布到给定命名空间中的所有代理,或者如果未提供命名空间,则发布到发送者的命名空间。

发布消息不期望有响应。

参数:
  • message (Any) – 要发布的消息。

  • topic_id (TopicId) – 要将消息发布到的主题。

  • sender (AgentId | None, optional) – 发送消息的代理。默认为 None。

  • cancellation_token (CancellationToken | None, optional) – 用于取消正在进行的 . 的令牌。默认为 None。

  • message_id (str | None, optional) – 消息 ID。如果为 None,将生成新的消息 ID。默认为 None。此消息 ID 必须是唯一的。建议使用 UUID。

抛出:

UndeliverableException – 如果消息无法传递。

async register_factory(type: str | AgentType, agent_factory: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None) AgentType[source]#

使用运行时和特定类型注册代理工厂。类型必须是唯一的。此 API 不会添加任何订阅。

注意

这是一个底层 API,通常应使用代理类的 register 方法,因为它也会自动处理订阅。

示例

from dataclasses import dataclass

from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event
from autogen_core.models import UserMessage


@dataclass
class MyMessage:
    content: str


class MyAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("My core agent")

    @event
    async def handler(self, message: UserMessage, context: MessageContext) -> None:
        print("Event received: ", message.content)


async def my_agent_factory():
    return MyAgent()


async def main() -> None:
    runtime: AgentRuntime = ...  # type: ignore
    await runtime.register_factory("my_agent", lambda: MyAgent())


import asyncio

asyncio.run(main())
参数:
  • type (str) – 此工厂创建的代理类型。它与代理类名不同。 type 参数用于区分不同的工厂函数而不是代理类。

  • agent_factory (Callable[[], T]) – 创建代理的工厂,其中 T 是具体的 Agent 类型。在工厂内部,使用 autogen_core.AgentInstantiationContext 来访问当前运行时和代理 ID 等变量。

  • expected_class (type[T] | None, optional) – 代理的预期类,用于运行时的工厂验证。默认为 None。如果为 None,则不执行验证。

async register_agent_instance(agent_instance: Agent, agent_id: AgentId) AgentId[source]#

使用运行时注册代理实例。类型可以被重用,但每个 agent_id 必须是唯一的。类型内的所有代理实例必须是同一对象类型。此 API 不会添加任何订阅。

注意

这是一个底层 API,通常应使用代理类的 register_instance 方法,因为它也会自动处理订阅。

示例

from dataclasses import dataclass

from autogen_core import AgentId, AgentRuntime, MessageContext, RoutedAgent, event
from autogen_core.models import UserMessage


@dataclass
class MyMessage:
    content: str


class MyAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("My core agent")

    @event
    async def handler(self, message: UserMessage, context: MessageContext) -> None:
        print("Event received: ", message.content)


async def main() -> None:
    runtime: AgentRuntime = ...  # type: ignore
    agent = MyAgent()
    await runtime.register_agent_instance(
        agent_instance=agent, agent_id=AgentId(type="my_agent", key="default")
    )


import asyncio

asyncio.run(main())
参数:
  • agent_instance (Agent) – 代理的具体实例。

  • agent_id (AgentId) – 代理的标识符。代理的类型是 agent_id.type

async try_get_underlying_agent_instance(id: AgentId, type: Type[T] = Agent) T[source]#

尝试按名称和命名空间获取底层代理实例。这通常不被推荐(因此名称很长),但在某些情况下可能有用。

如果底层代理无法访问,这将引发异常。

参数:
  • id (AgentId) – 代理 ID。

  • type (Type[T], optional) – 代理的预期类型。默认为 Agent。

返回:

T – 具体代理实例。

抛出:
async get(id: AgentId, /, *, lazy: bool = True) AgentId[source]#
async get(type: AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId
async save_state() Mapping[str, Any][source]#

保存整个运行时的状态,包括所有托管的代理。恢复状态的唯一方法是将其传递给 load_state()

状态的结构由实现定义,可以是任何 JSON 可序列化的对象。

返回:

Mapping[str, Any] – 已保存的状态。

async load_state(state: Mapping[str, Any]) None[source]#

加载整个运行时的状态,包括所有托管的代理。状态应与 save_state() 返回的状态相同。

参数:

state (Mapping[str, Any]) – 已保存的状态。

async agent_metadata(agent: AgentId) AgentMetadata[source]#

获取代理的元数据。

参数:

agent (AgentId) – 代理 ID。

返回:

AgentMetadata – 代理元数据。

async agent_save_state(agent: AgentId) Mapping[str, Any][source]#

保存单个代理的状态。

状态的结构由实现定义,可以是任何 JSON 可序列化的对象。

参数:

agent (AgentId) – 代理 ID。

返回:

Mapping[str, Any] – 已保存的状态。

async agent_load_state(agent: AgentId, state: Mapping[str, Any]) None[source]#

加载单个代理的状态。

参数:
  • agent (AgentId) – 代理 ID。

  • state (Mapping[str, Any]) – 已保存的状态。

async add_subscription(subscription: Subscription) None[source]#

添加一个运行时应在处理已发布消息时满足的新订阅

参数:

subscription (Subscription) – 要添加的订阅

async remove_subscription(id: str) None[source]#

从运行时中删除订阅

参数:

id (str) – 要删除的订阅的 ID

抛出:

LookupError – 如果订阅不存在

add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None[source]#

向运行时添加新的消息序列化器

注意:这将根据 type_name 和 data_content_type 属性去重序列化器

参数:

serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – 要添加的序列化器/s

class BaseAgent(description: str)[source]#

Bases: ABC, Agent

property metadata: AgentMetadata#

代理的元数据。

async bind_id_and_runtime(id: AgentId, runtime: AgentRuntime) None[source]#

用于将 Agent 实例绑定到 AgentRuntime 的函数。

参数:
  • agent_id (AgentId) – 代理的 ID。

  • runtime (AgentRuntime) – 要将代理绑定的 AgentRuntime 实例。

property type: str#
property id: AgentId#

代理的 ID。

property runtime: AgentRuntime#
final async on_message(message: Any, ctx: MessageContext) Any[source]#

代理的消息处理程序。这应仅由运行时调用,而不是由其他代理调用。

参数:
  • message (Any) – 接收到的消息。类型是 subscriptions 中类型之一。

  • ctx (MessageContext) – 消息的上下文。

返回:

Any – 对消息的响应。可以是 None。

抛出:
abstract async on_message_impl(message: Any, ctx: MessageContext) Any[source]#
async send_message(message: Any, recipient: AgentId, *, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[source]#

更多信息请参见 autogen_core.AgentRuntime.send_message()

async publish_message(message: Any, topic_id: TopicId, *, cancellation_token: CancellationToken | None = None) None[source]#
async save_state() Mapping[str, Any][source]#

保存代理的状态。结果必须是 JSON 可序列化的。

async load_state(state: Mapping[str, Any]) None[source]#

加载来自 save_state 的代理状态。

参数:

state (Mapping[str, Any]) – 代理的状态。必须是 JSON 可序列化的。

async close() None[source]#

运行时关闭时调用

async register_instance(runtime: AgentRuntime, agent_id: AgentId, *, skip_class_subscriptions: bool = True, skip_direct_message_subscription: bool = False) AgentId[source]#

此函数类似于 register,但用于注册代理实例。将创建基于代理 ID 的订阅并将其添加到运行时。

async classmethod register(runtime: AgentRuntime, type: str, factory: Callable[[], Self | Awaitable[Self]], *, skip_class_subscriptions: bool = False, skip_direct_message_subscription: bool = False) AgentType[source]#

注册 ABC 的虚拟子类。

返回子类,允许用作类装饰器。

class CacheStore[source]#

Bases: ABC, Generic[T], ComponentBase[BaseModel]

此协议定义了存储/缓存操作的基本接口。

子类应处理底层存储的生命周期。

component_type: ClassVar[ComponentType] = 'cache_store'#

组件的逻辑类型。

abstract get(key: str, default: T | None = None) T | None[source]#

从存储中检索项目。

参数:
  • key – 标识存储中项目的键。

  • default (optional) – 如果找不到键,则返回的默认值。默认为 None。

返回:

如果找到,则为与键关联的值,否则为默认值。

abstract set(key: str, value: T) None[source]#

在存储中设置一个项。

参数:
  • key – 要存储该项的键。

  • value – 要存储在存储中的值。

class InMemoryStore[source]#

Bases: CacheStore[T], Component[InMemoryStoreConfig]

component_provider_override: ClassVar[str | None] = 'autogen_core.InMemoryStore'#

覆盖组件的提供者字符串。这应该用于防止内部模块名称成为模块名称的一部分。

component_config_schema#

alias of InMemoryStoreConfig

get(key: str, default: T | None = None) T | None[source]#

从存储中检索项目。

参数:
  • key – 标识存储中项目的键。

  • default (optional) – 如果找不到键,则返回的默认值。默认为 None。

返回:

如果找到,则为与键关联的值,否则为默认值。

set(key: str, value: T) None[source]#

在存储中设置一个项。

参数:
  • key – 要存储该项的键。

  • value – 要存储在存储中的值。

_to_config() InMemoryStoreConfig[source]#

转储创建与此实例配置匹配的组件新实例所需的配置。

返回:

T – 组件的配置。

classmethod _from_config(config: InMemoryStoreConfig) Self[source]#

从配置对象创建组件的新实例。

参数:

config (T) – 配置对象。

返回:

Self – 组件的新实例。

class CancellationToken[source]#

基类: object

用于取消待处理异步调用的令牌。

cancel() None[source]#

取消与此取消令牌关联的待处理异步调用。

is_cancelled() bool[source]#

检查 CancellationToken 是否已被使用。

add_callback(callback: Callable[[], None]) None[source]#

添加一个在调用 cancel 时将被调用的回调。

将待处理的异步调用链接到一个令牌,以允许其被取消。

class AgentInstantiationContext[source]#

基类: object

一个静态类,提供代理实例化的上下文。

此静态类可用于在代理实例化过程中——在工厂函数或代理的类构造函数内部——访问当前的运行时和代理 ID。

示例

在工厂函数和代理的构造函数中获取当前的运行时和代理 ID。

import asyncio
from dataclasses import dataclass

from autogen_core import (
    AgentId,
    AgentInstantiationContext,
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    message_handler,
)


@dataclass
class TestMessage:
    content: str


class TestAgent(RoutedAgent):
    def __init__(self, description: str):
        super().__init__(description)
        # Get the current runtime -- we don't use it here, but it's available.
        _ = AgentInstantiationContext.current_runtime()
        # Get the current agent ID.
        agent_id = AgentInstantiationContext.current_agent_id()
        print(f"Current AgentID from constructor: {agent_id}")

    @message_handler
    async def handle_test_message(self, message: TestMessage, ctx: MessageContext) -> None:
        print(f"Received message: {message.content}")


def test_agent_factory() -> TestAgent:
    # Get the current runtime -- we don't use it here, but it's available.
    _ = AgentInstantiationContext.current_runtime()
    # Get the current agent ID.
    agent_id = AgentInstantiationContext.current_agent_id()
    print(f"Current AgentID from factory: {agent_id}")
    return TestAgent(description="Test agent")


async def main() -> None:
    # Create a SingleThreadedAgentRuntime instance.
    runtime = SingleThreadedAgentRuntime()

    # Start the runtime.
    runtime.start()

    # Register the agent type with a factory function.
    await runtime.register_factory("test_agent", test_agent_factory)

    # Send a message to the agent. The runtime will instantiate the agent and call the message handler.
    await runtime.send_message(TestMessage(content="Hello, world!"), AgentId("test_agent", "default"))

    # Stop the runtime.
    await runtime.stop()


asyncio.run(main())
classmethod current_runtime() AgentRuntime[source]#
classmethod current_agent_id() AgentId[source]#
classmethod is_in_factory_call() bool[source]#
class TopicId(type: str, source: str)[source]#

基类: object

TopicId 定义了广播消息的范围。本质上,代理运行时通过其广播 API 实现发布-订阅模型:发布消息时,必须指定主题。

更多信息请参见:主题

type: str#

此 topic_id 所包含的事件类型。遵循 CloudEvents 规范。

必须匹配模式:^[w-.:=]+Z

在此处了解更多信息:cloudevents/spec

source: str#

标识事件发生的上下文。遵循 CloudEvents 规范。

在此处了解更多信息:cloudevents/spec

classmethod from_str(topic_id: str) Self[source]#

将格式为 type/source 的字符串转换为 TopicId。

class Subscription(*args, **kwargs)[source]#

基类:Protocol

Subscriptions 定义了代理感兴趣的主题。

property id: str#

获取订阅的 ID。

实现应返回订阅的唯一 ID。通常这是一个 UUID。

返回:

str – 订阅的 ID。

is_match(topic_id: TopicId) bool[source]#

检查给定的 topic_id 是否与订阅匹配。

参数:

topic_id (TopicId) – 要检查的 TopicId。

返回:

bool – 如果 topic_id 与订阅匹配,则为 True,否则为 False。

map_to_agent(topic_id: TopicId) AgentId[source]#

将 topic_id 映射到代理。如果给定的 topic_id 的 is_match 返回 True,则应仅调用此方法。

参数:

topic_id (TopicId) – 要映射的 TopicId。

返回:

AgentId – 应处理 topic_id 的代理的 ID。

抛出:

CantHandleException – 如果订阅无法处理 topic_id。

class MessageContext(sender: AgentId | None, topic_id: TopicId | None, is_rpc: bool, cancellation_token: CancellationToken, message_id: str)[source]#

基类: object

sender: AgentId | None#
topic_id: TopicId | None#
is_rpc: bool#
cancellation_token: CancellationToken#
message_id: str#
class AgentType(type: str)[source]#

基类: object

type: str#

此代理类型的字符串表示。

class SubscriptionInstantiationContext[source]#

基类: object

classmethod agent_type() AgentType[source]#
class MessageHandlerContext[source]#

基类: object

classmethod agent_id() AgentId[source]#
class MessageSerializer(*args, **kwargs)[source]#

Bases: Protocol[T]

property data_content_type: str#
property type_name: str#
deserialize(payload: bytes) T[source]#
serialize(message: T) bytes[source]#
class UnknownPayload(type_name: str, data_content_type: str, payload: bytes)[source]#

基类: object

type_name: str#
data_content_type: str#
payload: bytes#
class Image(image: Image)[source]#

基类: object

表示图像。

示例

从 URL 加载图像。

from autogen_core import Image
from PIL import Image as PILImage
import aiohttp
import asyncio


async def from_url(url: str) -> Image:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            content = await response.read()
            return Image.from_pil(PILImage.open(content))


image = asyncio.run(from_url("https://example.com/image"))
classmethod from_pil(pil_image: Image) Image[source]#
classmethod from_uri(uri: str) Image[source]#
classmethod from_base64(base64_str: str) Image[source]#
to_base64() str[source]#
classmethod from_file(file_path: Path) Image[source]#
property data_uri: str#
to_openai_format(detail: Literal['auto', 'low', 'high'] = 'auto') Dict[str, Any][source]#
class RoutedAgent(description: str)[source]#

Bases: BaseAgent

基于消息类型和可选匹配函数的代理基类,用于将消息路由到处理程序。

要创建路由代理,请继承此类并使用 event()rpc() 装饰器将消息处理程序添加为方法。

示例

from dataclasses import dataclass
from autogen_core import MessageContext
from autogen_core import RoutedAgent, event, rpc


@dataclass
class Message:
    pass


@dataclass
class MessageWithContent:
    content: str


@dataclass
class Response:
    pass


class MyAgent(RoutedAgent):
    def __init__(self):
        super().__init__("MyAgent")

    @event
    async def handle_event_message(self, message: Message, ctx: MessageContext) -> None:
        assert ctx.topic_id is not None
        await self.publish_message(MessageWithContent("event handled"), ctx.topic_id)

    @rpc(match=lambda message, ctx: message.content == "special")  # type: ignore
    async def handle_special_rpc_message(self, message: MessageWithContent, ctx: MessageContext) -> Response:
        return Response()
async on_message_impl(message: Any, ctx: MessageContext) Any | None[source]#

通过将消息路由到适当的消息处理程序来处理消息。请不要在子类中覆盖此方法。而是将消息处理程序添加为使用 event()rpc() 装饰器修饰的方法。

async on_unhandled_message(message: Any, ctx: MessageContext) None[source]#

收到一条没有匹配的消息处理程序的收到的消息时调用。默认实现记录一条信息消息。

class ClosureAgent(description: str, closure: Callable[[ClosureContext, T, MessageContext], Awaitable[Any]], *, unknown_type_policy: Literal['error', 'warn', 'ignore'] = 'warn')[source]#

Bases: BaseAgent, ClosureContext

property metadata: AgentMetadata#

代理的元数据。

property id: AgentId#

代理的 ID。

property runtime: AgentRuntime#
async on_message_impl(message: Any, ctx: MessageContext) Any[source]#
async save_state() Mapping[str, Any][source]#

闭包代理没有状态。因此此方法始终返回一个空字典。

async load_state(state: Mapping[str, Any]) None[source]#

闭包代理没有状态。因此此方法不起作用。

async classmethod register_closure(runtime: AgentRuntime, type: str, closure: Callable[[ClosureContext, T, MessageContext], Awaitable[Any]], *, unknown_type_policy: Literal['error', 'warn', 'ignore'] = 'warn', skip_direct_message_subscription: bool = False, description: str = '', subscriptions: Callable[[], list[Subscription] | Awaitable[list[Subscription]]] | None = None) AgentType[source]#

闭包代理允许您使用闭包(函数)来定义代理,而无需定义类。它允许从运行时中提取值。

闭包可以定义预期的消息类型,或者可以使用 Any 来接受任何类型的消息。

示例

import asyncio
from autogen_core import SingleThreadedAgentRuntime, MessageContext, ClosureAgent, ClosureContext
from dataclasses import dataclass

from autogen_core._default_subscription import DefaultSubscription
from autogen_core._default_topic import DefaultTopicId


@dataclass
class MyMessage:
    content: str


async def main():
    queue = asyncio.Queue[MyMessage]()

    async def output_result(_ctx: ClosureContext, message: MyMessage, ctx: MessageContext) -> None:
        await queue.put(message)

    runtime = SingleThreadedAgentRuntime()
    await ClosureAgent.register_closure(
        runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()]
    )

    runtime.start()
    await runtime.publish_message(MyMessage("Hello, world!"), DefaultTopicId())
    await runtime.stop_when_idle()

    result = await queue.get()
    print(result)


asyncio.run(main())
参数:
  • runtime (AgentRuntime) – 要注册代理的运行时。

  • type (str) – 注册代理的代理类型。

  • closure (Callable[[ClosureContext, T, MessageContext], Awaitable[Any]]) – 用于处理消息的闭包。

  • unknown_type_policy (Literal["error", "warn", "ignore"], optional) – 如果遇到与闭包类型不匹配的类型时该怎么办。默认为“warn”。

  • skip_direct_message_subscription (bool, optional) – 不要为此代理添加直接消息订阅。默认为 False。

  • description (str, optional) – 对代理功能的描述。默认为“”.

  • subscriptions (Callable[[], list[Subscription] | Awaitable[list[Subscription]]] | None, optional) – 此闭包代理的订阅列表。默认为 None。

返回:

AgentType – 已注册代理的类型。

class ClosureContext(*args, **kwargs)[source]#

基类:Protocol

property id: AgentId#
async send_message(message: Any, recipient: AgentId, *, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[source]#
async publish_message(message: Any, topic_id: TopicId, *, cancellation_token: CancellationToken | None = None) None[source]#
message_handler(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]]], MessageHandler[AgentT, ReceivesT, ProducesT]] | MessageHandler[AgentT, ReceivesT, ProducesT][source]#

泛型消息处理程序的装饰器。

将此装饰器添加到 RoutedAgent 类中的方法,这些方法旨在处理事件消息和 RPC 消息。这些方法必须具有特定的签名,并且必须遵循该签名才能使其有效。

  • 该方法必须是 async 方法。

  • 该方法必须用 @message_handler 装饰器进行装饰。

  • 该方法必须有 3 个参数。
    1. self

    2. message:要处理的消息,必须用其预期处理的消息类型进行类型提示。

    3. ctx:一个 autogen_core.MessageContext 对象。

  • 该方法必须用它作为响应可以返回的消息类型进行类型提示,或者如果它不返回任何内容,则可以返回 None

处理程序可以通过接受消息类型的联合来处理多种消息类型。它也可以通过返回消息类型的联合来返回多种消息类型。

参数:
  • func – 要装饰的函数。

  • strict – 如果为 True,则在消息类型或返回类型不在目标类型中时,处理程序将引发异常。如果为 False,则会记录一条警告。

  • match – 一个函数,它将消息和上下文作为参数并返回一个布尔值。这用于在消息类型之后进行二次路由。对于处理相同消息类型的处理程序,匹配函数将按字母顺序应用处理程序,并且将调用第一个匹配的处理程序,而跳过其余的处理程序。如果为 None,则将调用第一个按字母顺序匹配相同消息类型的处理程序。

event(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, None]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, None]]], MessageHandler[AgentT, ReceivesT, None]] | MessageHandler[AgentT, ReceivesT, None][source]#

事件消息处理器的装饰器。

将此装饰器添加到 RoutedAgent 类中旨在处理事件消息的方法上。这些方法必须遵循特定的签名才能使其有效

  • 该方法必须是 async 方法。

  • 该方法必须用 @message_handler 装饰器进行装饰。

  • 该方法必须有 3 个参数。
    1. self

    2. message:要处理的事件消息,此消息必须进行类型提示,指示其要处理的消息类型。

    3. ctx:一个 autogen_core.MessageContext 对象。

  • 该方法必须返回 None

通过接受消息类型的联合,处理程序可以处理多种消息类型。

参数:
  • func – 要装饰的函数。

  • strict – 如果为 True,当消息类型不在目标类型中时,处理程序将引发异常。如果为 False,它将改为记录警告。

  • match – 一个函数,它将消息和上下文作为参数并返回一个布尔值。这用于在消息类型之后进行二次路由。对于处理相同消息类型的处理程序,匹配函数将按字母顺序应用处理程序,并且将调用第一个匹配的处理程序,而跳过其余的处理程序。如果为 None,则将调用第一个按字母顺序匹配相同消息类型的处理程序。

rpc(func: None | Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]] = None, *, strict: bool = True, match: None | Callable[[ReceivesT, MessageContext], bool] = None) Callable[[Callable[[AgentT, ReceivesT, MessageContext], Coroutine[Any, Any, ProducesT]]], MessageHandler[AgentT, ReceivesT, ProducesT]] | MessageHandler[AgentT, ReceivesT, ProducesT][source]#

RPC 消息处理器的装饰器。

将此装饰器添加到 RoutedAgent 类中旨在处理 RPC 消息的方法上。这些方法必须遵循特定的签名才能使其有效

  • 该方法必须是 async 方法。

  • 该方法必须用 @message_handler 装饰器进行装饰。

  • 该方法必须有 3 个参数。
    1. self

    2. message:要处理的消息,必须用其预期处理的消息类型进行类型提示。

    3. ctx:一个 autogen_core.MessageContext 对象。

  • 该方法必须用它作为响应可以返回的消息类型进行类型提示,或者如果它不返回任何内容,则可以返回 None

处理程序可以通过接受消息类型的联合来处理多种消息类型。它也可以通过返回消息类型的联合来返回多种消息类型。

参数:
  • func – 要装饰的函数。

  • strict – 如果为 True,则在消息类型或返回类型不在目标类型中时,处理程序将引发异常。如果为 False,则会记录一条警告。

  • match – 一个函数,它将消息和上下文作为参数并返回一个布尔值。这用于在消息类型之后进行二次路由。对于处理相同消息类型的处理程序,匹配函数将按字母顺序应用处理程序,并且将调用第一个匹配的处理程序,而跳过其余的处理程序。如果为 None,则将调用第一个按字母顺序匹配相同消息类型的处理程序。

class FunctionCall(id: 'str', arguments: 'str', name: 'str')[source]#

基类: object

id: str#
arguments: str#
name: str#
class TypeSubscription(topic_type: str, agent_type: str | AgentType, id: str | None = None)[source]#

Bases: Subscription

此订阅根据主题类型匹配,并使用主题的源作为代理键映射到代理。

此订阅导致每个源都有自己的代理实例。

示例

from autogen_core import TypeSubscription

subscription = TypeSubscription(topic_type="t1", agent_type="a1")

在这种情况下

  • 类型为 t1、源为 s1 的 topic_id 将由键为 s1a1 类型的代理处理。

  • 类型为 t1、源为 s2 的 topic_id 将由键为 s2a1 类型的代理处理。

参数:
  • topic_type (str) – 要匹配的主题类型

  • agent_type (str) – 处理此订阅的代理类型

property id: str#

获取订阅的 ID。

实现应返回订阅的唯一 ID。通常这是一个 UUID。

返回:

str – 订阅的 ID。

property topic_type: str#
property agent_type: str#
is_match(topic_id: TopicId) bool[source]#

检查给定的 topic_id 是否与订阅匹配。

参数:

topic_id (TopicId) – 要检查的 TopicId。

返回:

bool – 如果 topic_id 与订阅匹配,则为 True,否则为 False。

map_to_agent(topic_id: TopicId) AgentId[source]#

将 topic_id 映射到代理。如果给定的 topic_id 的 is_match 返回 True,则应仅调用此方法。

参数:

topic_id (TopicId) – 要映射的 TopicId。

返回:

AgentId – 应处理 topic_id 的代理的 ID。

抛出:

CantHandleException – 如果订阅无法处理 topic_id。

class DefaultSubscription(topic_type: str = 'default', agent_type: str | AgentType | None = None)[source]#

Bases: TypeSubscription

默认订阅旨在为仅需要代理全局范围的应用程序提供合理的默认值。

此主题默认使用“default”主题类型,并尝试根据实例化上下文检测要使用的代理类型。

参数:
  • topic_type (str, optional) – 要订阅的主题类型。默认为“default”。

  • agent_type (str, optional) – 用于订阅的代理类型。默认为 None,在这种情况下,它将尝试根据实例化上下文检测代理类型。

class DefaultTopicId(type: str = 'default', source: str | None = None)[source]#

Bases: TopicId

DefaultTopicId 为 TopicId 的 topic_id 和 source 字段提供了合理的默认值。

如果在消息处理器的上下文中创建,则 source 将设置为消息处理器的 agent_id,否则将设置为“default”。

参数:
  • type (str, optional) – 要将消息发布到的主题类型。默认为“default”。

  • source (str | None, optional) – 要将消息发布到的主题源。如果为 None,则 source 将设置为消息处理器的 agent_id,如果处于消息处理器上下文中,否则将设置为“default”。默认为 None。

default_subscription(cls: Type[BaseAgentType] | None = None) Callable[[Type[BaseAgentType]], Type[BaseAgentType]] | Type[BaseAgentType][source]#
type_subscription(topic_type: str) Callable[[Type[BaseAgentType]], Type[BaseAgentType]][source]#
class TypePrefixSubscription(topic_type_prefix: str, agent_type: str | AgentType, id: str | None = None)[source]#

Bases: Subscription

此订阅根据类型前缀匹配主题,并使用主题的源作为代理键映射到代理。

此订阅导致每个源都有自己的代理实例。

示例

from autogen_core import TypePrefixSubscription

subscription = TypePrefixSubscription(topic_type_prefix="t1", agent_type="a1")

在这种情况下

  • 类型为 t1、源为 s1 的 topic_id 将由键为 s1a1 类型的代理处理。

  • 类型为 t1、源为 s2 的 topic_id 将由键为 s2a1 类型的代理处理。

  • 类型为 t1SUFFIX、源为 s2 的 topic_id 将由键为 s2a1 类型的代理处理。

参数:
  • topic_type_prefix (str) – 要匹配的主题类型前缀

  • agent_type (str) – 处理此订阅的代理类型

property id: str#

获取订阅的 ID。

实现应返回订阅的唯一 ID。通常这是一个 UUID。

返回:

str – 订阅的 ID。

property topic_type_prefix: str#
property agent_type: str#
is_match(topic_id: TopicId) bool[source]#

检查给定的 topic_id 是否与订阅匹配。

参数:

topic_id (TopicId) – 要检查的 TopicId。

返回:

bool – 如果 topic_id 与订阅匹配,则为 True,否则为 False。

map_to_agent(topic_id: TopicId) AgentId[source]#

将 topic_id 映射到代理。如果给定的 topic_id 的 is_match 返回 True,则应仅调用此方法。

参数:

topic_id (TopicId) – 要映射的 TopicId。

返回:

AgentId – 应处理 topic_id 的代理的 ID。

抛出:

CantHandleException – 如果订阅无法处理 topic_id。

JSON_DATA_CONTENT_TYPE = 'application/json'#

JSON 数据的 Content-Type。

PROTOBUF_DATA_CONTENT_TYPE = 'application/x-protobuf'#

Protobuf 数据的 Content-Type。

class SingleThreadedAgentRuntime(*, intervention_handlers: List[InterventionHandler] | None = None, tracer_provider: TracerProvider | None = None, ignore_unhandled_exceptions: bool = True)[source]#

Bases: AgentRuntime

单线程代理运行时,它使用单个 asyncio 队列处理所有消息。消息按接收顺序传递,并且运行时在单独的 asyncio 任务中并发处理每条消息。

注意

此运行时适用于开发和独立应用程序。不适用于高吞吐量或高并发场景。

参数:
  • intervention_handlers (List[InterventionHandler], optional) – 可以在消息发送或发布之前拦截消息的干预处理程序列表。默认为 None。

  • tracer_provider (TracerProvider, optional) – 用于跟踪的跟踪器提供程序。默认为 None。此外,您可以设置环境变量 AUTOGEN_DISABLE_RUNTIME_TRACINGtrue 以禁用代理运行时遥测,如果您没有访问运行时构造函数。例如,如果您使用的是 ComponentConfig

  • ignore_unhandled_exceptions (bool, optional) – 是否忽略代理事件处理程序中发生的未处理异常。任何后台异常将在下一次调用 process_next 或从已等待的 stopstop_when_idlestop_when 中引发。请注意,这不适用于 RPC 处理程序。默认为 True。

示例

创建运行时、注册代理、发送消息并停止运行时的简单示例

import asyncio
from dataclasses import dataclass

from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler


@dataclass
class MyMessage:
    content: str


class MyAgent(RoutedAgent):
    @message_handler
    async def handle_my_message(self, message: MyMessage, ctx: MessageContext) -> None:
        print(f"Received message: {message.content}")


async def main() -> None:
    # Create a runtime and register the agent
    runtime = SingleThreadedAgentRuntime()
    await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My agent"))

    # Start the runtime, send a message and stop the runtime
    runtime.start()
    await runtime.send_message(MyMessage("Hello, world!"), recipient=AgentId("my_agent", "default"))
    await runtime.stop()


asyncio.run(main())

创建运行时、注册代理、发布消息并停止运行时的示例

import asyncio
from dataclasses import dataclass

from autogen_core import (
    DefaultTopicId,
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    default_subscription,
    message_handler,
)


@dataclass
class MyMessage:
    content: str


# The agent is subscribed to the default topic.
@default_subscription
class MyAgent(RoutedAgent):
    @message_handler
    async def handle_my_message(self, message: MyMessage, ctx: MessageContext) -> None:
        print(f"Received message: {message.content}")


async def main() -> None:
    # Create a runtime and register the agent
    runtime = SingleThreadedAgentRuntime()
    await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My agent"))

    # Start the runtime.
    runtime.start()
    # Publish a message to the default topic that the agent is subscribed to.
    await runtime.publish_message(MyMessage("Hello, world!"), DefaultTopicId())
    # Wait for the message to be processed and then stop the runtime.
    await runtime.stop_when_idle()


asyncio.run(main())
property unprocessed_messages_count: int#
async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any[source]#

向代理发送消息并获取响应。

参数:
  • message (Any) – 要发送的消息。

  • recipient (AgentId) – 要将消息发送到的代理。

  • sender (AgentId | None, optional) – 发送消息的代理。除非是从外部直接发送到运行时,否则 **仅** 应为 None。默认为 None。

  • cancellation_token (CancellationToken | None, optional) – 用于取消正在进行的 . 的令牌。默认为 None。

抛出:
返回:

Any – 来自代理的响应。

async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None[source]#

将消息发布到给定命名空间中的所有代理,或者如果未提供命名空间,则发布到发送者的命名空间。

发布消息不期望有响应。

参数:
  • message (Any) – 要发布的消息。

  • topic_id (TopicId) – 要将消息发布到的主题。

  • sender (AgentId | None, optional) – 发送消息的代理。默认为 None。

  • cancellation_token (CancellationToken | None, optional) – 用于取消正在进行的 . 的令牌。默认为 None。

  • message_id (str | None, optional) – 消息 ID。如果为 None,将生成新的消息 ID。默认为 None。此消息 ID 必须是唯一的。建议使用 UUID。

抛出:

UndeliverableException – 如果消息无法传递。

async save_state() Mapping[str, Any][source]#

保存所有实例化代理的状态。

此方法调用每个代理上的 save_state() 方法,并返回一个将代理 ID 映射到其状态的字典。

注意

此方法目前不保存订阅状态。我们将在未来添加此功能。

返回:

一个将代理 ID 映射到其状态的字典。

async load_state(state: Mapping[str, Any]) None[source]#

加载所有实例化代理的状态。

此方法使用字典中提供的状态,在每个代理上调用 load_state() 方法。字典的键是代理 ID,值是 save_state() 方法返回的状态字典。

注意

此方法目前不加载订阅状态。我们将在未来添加此功能。

async process_next() None[source]#

处理队列中的下一条消息。

如果在后台任务中发生未处理的异常,它将在此处引发。process_next 在引发未处理异常后无法再次调用。

start() None[source]#

启动运行时消息处理循环。这将在后台任务中运行。

示例

import asyncio
from autogen_core import SingleThreadedAgentRuntime


async def main() -> None:
    runtime = SingleThreadedAgentRuntime()
    runtime.start()

    # ... do other things ...

    await runtime.stop()


asyncio.run(main())
async close() None[source]#

如果适用,则调用 stop() 以及所有实例化代理上的 Agent.close() 方法

async stop() None[source]#

立即停止运行时消息处理循环。当前正在处理的消息将完成,但之后的所有其他消息将被丢弃。

async stop_when_idle() None[source]#

当没有正在处理或排队的消息时,停止运行时消息处理循环。这是停止运行时的最常见方式。

async stop_when(condition: Callable[[], bool]) None[source]#

当满足条件时,停止运行时消息处理循环。

注意

不建议使用此方法,它仅用于旧版。它会启动一个忙碌的循环来不断检查条件。调用 stop_when_idlestop 更有效。如果您需要根据条件停止运行时,请考虑使用后台任务和 asyncio.Event 来发出信号,指示何时满足条件以及后台任务应调用 stop。

async agent_metadata(agent: AgentId) AgentMetadata[source]#

获取代理的元数据。

参数:

agent (AgentId) – 代理 ID。

返回:

AgentMetadata – 代理元数据。

async agent_save_state(agent: AgentId) Mapping[str, Any][source]#

保存单个代理的状态。

状态的结构由实现定义,可以是任何 JSON 可序列化的对象。

参数:

agent (AgentId) – 代理 ID。

返回:

Mapping[str, Any] – 已保存的状态。

async agent_load_state(agent: AgentId, state: Mapping[str, Any]) None[source]#

加载单个代理的状态。

参数:
  • agent (AgentId) – 代理 ID。

  • state (Mapping[str, Any]) – 已保存的状态。

async register_factory(type: str | AgentType, agent_factory: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None) AgentType[source]#

使用运行时和特定类型注册代理工厂。类型必须是唯一的。此 API 不会添加任何订阅。

注意

这是一个底层 API,通常应使用代理类的 register 方法,因为它也会自动处理订阅。

示例

from dataclasses import dataclass

from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event
from autogen_core.models import UserMessage


@dataclass
class MyMessage:
    content: str


class MyAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("My core agent")

    @event
    async def handler(self, message: UserMessage, context: MessageContext) -> None:
        print("Event received: ", message.content)


async def my_agent_factory():
    return MyAgent()


async def main() -> None:
    runtime: AgentRuntime = ...  # type: ignore
    await runtime.register_factory("my_agent", lambda: MyAgent())


import asyncio

asyncio.run(main())
参数:
  • type (str) – 此工厂创建的代理类型。它与代理类名不同。 type 参数用于区分不同的工厂函数而不是代理类。

  • agent_factory (Callable[[], T]) – 创建代理的工厂,其中 T 是具体的 Agent 类型。在工厂内部,使用 autogen_core.AgentInstantiationContext 来访问当前运行时和代理 ID 等变量。

  • expected_class (type[T] | None, optional) – 代理的预期类,用于运行时的工厂验证。默认为 None。如果为 None,则不执行验证。

async register_agent_instance(agent_instance: Agent, agent_id: AgentId) AgentId[source]#

使用运行时注册代理实例。类型可以被重用,但每个 agent_id 必须是唯一的。类型内的所有代理实例必须是同一对象类型。此 API 不会添加任何订阅。

注意

这是一个底层 API,通常应使用代理类的 register_instance 方法,因为它也会自动处理订阅。

示例

from dataclasses import dataclass

from autogen_core import AgentId, AgentRuntime, MessageContext, RoutedAgent, event
from autogen_core.models import UserMessage


@dataclass
class MyMessage:
    content: str


class MyAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("My core agent")

    @event
    async def handler(self, message: UserMessage, context: MessageContext) -> None:
        print("Event received: ", message.content)


async def main() -> None:
    runtime: AgentRuntime = ...  # type: ignore
    agent = MyAgent()
    await runtime.register_agent_instance(
        agent_instance=agent, agent_id=AgentId(type="my_agent", key="default")
    )


import asyncio

asyncio.run(main())
参数:
  • agent_instance (Agent) – 代理的具体实例。

  • agent_id (AgentId) – 代理的标识符。代理的类型是 agent_id.type

async try_get_underlying_agent_instance(id: AgentId, type: Type[T] = Agent) T[source]#

尝试按名称和命名空间获取底层代理实例。这通常不被推荐(因此名称很长),但在某些情况下可能有用。

如果底层代理无法访问,这将引发异常。

参数:
  • id (AgentId) – 代理 ID。

  • type (Type[T], optional) – 代理的预期类型。默认为 Agent。

返回:

T – 具体代理实例。

抛出:
async add_subscription(subscription: Subscription) None[source]#

添加一个运行时应在处理已发布消息时满足的新订阅

参数:

subscription (Subscription) – 要添加的订阅

async remove_subscription(id: str) None[source]#

从运行时中删除订阅

参数:

id (str) – 要删除的订阅的 ID

抛出:

LookupError – 如果订阅不存在

async get(id_or_type: AgentId | AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId[source]#
add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None[source]#

向运行时添加新的消息序列化器

注意:这将根据 type_name 和 data_content_type 属性去重序列化器

参数:

serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – 要添加的序列化器/s

ROOT_LOGGER_NAME = 'autogen_core'#

根日志记录器的名称。

EVENT_LOGGER_NAME = 'autogen_core.events'#

用于结构化事件的日志记录器的名称。

TRACE_LOGGER_NAME = 'autogen_core.trace'#

用于开发人员预期跟踪日志记录的记录器名称。此日志的内容和格式不应被依赖。

class Component[source]#

Bases: ComponentFromConfig[ConfigT], ComponentSchemaType[ConfigT], Generic[ConfigT]

要创建组件类,请继承此类以获取具体类,并继承 ComponentBase 以获取接口。然后实现两个类变量

  • component_config_schema - 代表组件配置的 Pydantic 模型类。这也是 Component 的类型参数。

  • component_type - 组件的逻辑类型是什么。

示例

from __future__ import annotations

from pydantic import BaseModel
from autogen_core import Component


class Config(BaseModel):
    value: str


class MyComponent(Component[Config]):
    component_type = "custom"
    component_config_schema = Config

    def __init__(self, value: str):
        self.value = value

    def _to_config(self) -> Config:
        return Config(value=self.value)

    @classmethod
    def _from_config(cls, config: Config) -> MyComponent:
        return cls(value=config.value)
class ComponentBase[source]#

Bases: ComponentToConfig[ConfigT], ComponentLoader, Generic[ConfigT]

class ComponentFromConfig[source]#

Bases: Generic[FromConfigT]

classmethod _from_config(config: FromConfigT) Self[source]#

从配置对象创建组件的新实例。

参数:

config (T) – 配置对象。

返回:

Self – 组件的新实例。

classmethod _from_config_past_version(config: Dict[str, Any], version: int) Self[source]#

从以前版本的配置对象创建组件的新实例。

仅当配置对象的版本小于当前版本时才调用此方法,因为在这种情况下,模式是未知的。

参数:
  • config (Dict[str, Any]) – 配置对象。

  • version (int) – 配置对象的版本。

返回:

Self – 组件的新实例。

class ComponentLoader[source]#

基类: object

classmethod load_component(model: ComponentModel | Dict[str, Any], expected: None = None) Self[source]#
classmethod load_component(model: ComponentModel | Dict[str, Any], expected: Type[ExpectedType]) ExpectedType

从模型加载组件。 intended to be used with the return type of autogen_core.ComponentConfig.dump_component()

示例

from autogen_core import ComponentModel
from autogen_core.models import ChatCompletionClient

component: ComponentModel = ...  # type: ignore

model_client = ChatCompletionClient.load_component(component)
参数:
  • model (ComponentModel) – 要从中加载组件的模型。

  • model – _description_

  • expected (Type[ExpectedType] | None, optional) – 仅当直接在 ComponentLoader 上使用时才需要显式类型。默认为 None。

返回:

Self – 加载的组件。

抛出:
  • ValueError – 如果 provider 字符串无效。

  • TypeError – Provider 不是 ComponentConfigImpl 的子类,或者预期的类型不匹配。

返回:

Self | ExpectedType – 加载的组件。

pydantic model ComponentModel[source]#

基类: BaseModel

组件的模型类。包含实例化组件所需的所有信息。

显示 JSON 模式
{
   "title": "ComponentModel",
   "description": "Model class for a component. Contains all information required to instantiate a component.",
   "type": "object",
   "properties": {
      "provider": {
         "title": "Provider",
         "type": "string"
      },
      "component_type": {
         "anyOf": [
            {
               "enum": [
                  "model",
                  "agent",
                  "tool",
                  "termination",
                  "token_provider",
                  "workbench"
               ],
               "type": "string"
            },
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Component Type"
      },
      "version": {
         "anyOf": [
            {
               "type": "integer"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Version"
      },
      "component_version": {
         "anyOf": [
            {
               "type": "integer"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Component Version"
      },
      "description": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Description"
      },
      "label": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Label"
      },
      "config": {
         "title": "Config",
         "type": "object"
      }
   },
   "required": [
      "provider",
      "config"
   ]
}

字段:
  • component_type (Literal['model', 'agent', 'tool', 'termination', 'token_provider', 'workbench'] | str | None)

  • component_version (int | None)

  • config (dict[str, Any])

  • description (str | None)

  • label (str | None)

  • provider (str)

  • version (int | None)

field provider: str [Required]#

描述组件如何实例化。

field component_type: ComponentType | None = None#

组件的逻辑类型。如果缺失,则组件假定 provider 的默认类型。

field version: int | None = None#

组件规范的版本。如果缺失,则组件假定加载它的库的当前版本。这显然很危险,应该用于用户编写的临时配置。对于所有其他配置,应指定版本。

field component_version: int | None = None#

组件的版本。如果缺失,则组件假定 provider 的默认版本。

field description: str | None = None#

组件的描述。

field label: str | None = None#

组件的可读标签。如果缺失,则组件假定 provider 的类名。

field config: dict[str, Any] [Required]#

Schema 验证的 config 字段将传递给给定类的 autogen_core.ComponentConfigImpl._from_config() 实现,以创建组件类的新实例。

class ComponentSchemaType[source]#

Bases: Generic[ConfigT]

component_config_schema: Type[ConfigT]#

表示组件配置的 Pydantic 模型类。

required_class_vars = ['component_config_schema', 'component_type']#
class ComponentToConfig[source]#

Bases: Generic[ToConfigT]

一个类必须实现的两个方法才能成为组件。

参数:

Protocol (ConfigT) – 派生自 pydantic.BaseModel 的类型。

component_type: ClassVar[Literal['model', 'agent', 'tool', 'termination', 'token_provider', 'workbench'] | str]#

组件的逻辑类型。

component_version: ClassVar[int] = 1#

组件的版本,如果引入了 schema 不兼容性,则应更新此版本。

component_provider_override: ClassVar[str | None] = None#

覆盖组件的提供者字符串。这应该用于防止内部模块名称成为模块名称的一部分。

component_description: ClassVar[str | None] = None#

组件的描述。如果未提供,则使用类的 docstring。

component_label: ClassVar[str | None] = None#

组件的可读标签。如果未提供,则使用组件类名。

_to_config() ToConfigT[source]#

转储创建与此实例配置匹配的组件新实例所需的配置。

返回:

T – 组件的配置。

dump_component() ComponentModel[source]#

将组件转储为可以重新加载的模型。

抛出:

TypeError – 如果组件是本地类。

返回:

ComponentModel – 代表组件的模型。

is_component_class(cls: type) TypeGuard[Type[_ConcreteComponent[BaseModel]]][source]#
is_component_instance(cls: Any) TypeGuard[_ConcreteComponent[BaseModel]][source]#
final class DropMessage[source]#

基类: object

用于指示干预处理程序应丢弃消息的标记类型。该类型本身应从处理程序返回。

class InterventionHandler(*args, **kwargs)[source]#

基类:Protocol

干预处理程序是一个类,可用于修改、记录或丢弃由 autogen_core.base.AgentRuntime 处理的消息。

消息提交到运行时时会调用该处理程序。

目前唯一支持此功能的运行时是 autogen_core.base.SingleThreadedAgentRuntime

注意:从任何干预处理程序方法返回 None 将导致发出警告,并被视为“无更改”。如果您打算丢弃消息,则应显式返回 DropMessage

示例

from autogen_core import DefaultInterventionHandler, MessageContext, AgentId, SingleThreadedAgentRuntime
from dataclasses import dataclass
from typing import Any


@dataclass
class MyMessage:
    content: str


class MyInterventionHandler(DefaultInterventionHandler):
    async def on_send(self, message: Any, *, message_context: MessageContext, recipient: AgentId) -> MyMessage:
        if isinstance(message, MyMessage):
            message.content = message.content.upper()
        return message


runtime = SingleThreadedAgentRuntime(intervention_handlers=[MyInterventionHandler()])
async on_send(message: Any, *, message_context: MessageContext, recipient: AgentId) Any | type[DropMessage][source]#

当通过 autogen_core.base.AgentRuntime.send_message() 将消息提交给 AgentRuntime 时调用。

async on_publish(message: Any, *, message_context: MessageContext) Any | type[DropMessage][source]#

当通过 autogen_core.base.AgentRuntime.publish_message() 将消息发布到 AgentRuntime 时调用。

async on_response(message: Any, *, sender: AgentId, recipient: AgentId | None) Any | type[DropMessage][source]#

当 Agent 的消息处理程序返回一个值时,AgentRuntime 收到响应时调用。

class DefaultInterventionHandler(*args, **kwargs)[source]#

Bases: InterventionHandler

提供所有干预处理程序方法的默认实现的简单类,这些方法只是返回未更改的消息。允许轻松子类化以仅覆盖所需的方法。

async on_send(message: Any, *, message_context: MessageContext, recipient: AgentId) Any | type[DropMessage][source]#

当通过 autogen_core.base.AgentRuntime.send_message() 将消息提交给 AgentRuntime 时调用。

async on_publish(message: Any, *, message_context: MessageContext) Any | type[DropMessage][source]#

当通过 autogen_core.base.AgentRuntime.publish_message() 将消息发布到 AgentRuntime 时调用。

async on_response(message: Any, *, sender: AgentId, recipient: AgentId | None) Any | type[DropMessage][source]#

当 Agent 的消息处理程序返回一个值时,AgentRuntime 收到响应时调用。

trace_create_agent_span(agent_name: str, *, tracer: Tracer | None = None, parent: Span | None = None, agent_id: str | None = None, agent_description: str | None = None) Generator[Span, Any, None][source]#

用于创建代理的 span 的上下文管理器,遵循 OpenTelemetry 生成式 AI 系统语义约定。

请参阅 GenAI 语义约定文档: OpenTelemetry GenAI 语义约定

警告

GenAI 语义约定仍处于孵化阶段,可能会在后续版本中发生更改。

参数:
  • agent_name (str) – 要创建的代理的名称。

  • tracer (Optional[trace.Tracer]) – 用于创建 span 的 tracer。

  • parent (Optional[Span]) – 要链接此 span 的父 span。

  • agent_id (Optional[str]) – 代理的唯一标识符。

  • agent_description (Optional[str]) – 代理的描述。

trace_invoke_agent_span(agent_name: str, *, tracer: Tracer | None = None, parent: Span | None = None, agent_id: str | None = None, agent_description: str | None = None) Generator[Span, Any, None][source]#

用于调用代理的 span 的上下文管理器,遵循 OpenTelemetry 生成式 AI 系统语义约定。

请参阅 GenAI 语义约定文档: OpenTelemetry GenAI 语义约定

警告

GenAI 语义约定仍处于孵化阶段,可能会在后续版本中发生更改。

参数:
  • agent_name (str) – 要调用的代理的名称。

  • tracer (Optional[trace.Tracer]) – 用于创建 span 的 tracer。

  • parent (Optional[Span]) – 要链接此 span 的父 span。

  • agent_id (Optional[str]) – 代理的唯一标识符。

  • agent_description (Optional[str]) – 代理的描述。

trace_tool_span(tool_name: str, *, tracer: Tracer | None = None, parent: Span | None = None, tool_description: str | None = None, tool_call_id: str | None = None) Generator[Span, Any, None][source]#

用于工具执行的 span 的上下文管理器,遵循 OpenTelemetry 生成式 AI 系统语义约定。

请参阅 GenAI 语义约定文档: OpenTelemetry GenAI 语义约定

警告

GenAI 语义约定仍处于孵化阶段,可能会在后续版本中发生更改。

参数:
  • tool_name (str) – 要执行的工具的名称。

  • tracer (Optional[trace.Tracer]) – 用于创建 span 的 tracer。

  • parent (Optional[Span]) – 要链接此 span 的父 span。

  • tool_description (Optional[str]) – 工具的描述。

  • tool_call_id (Optional[str]) – 工具调用的唯一标识符。