autogen_ext.runtimes.grpc#
- class GrpcWorkerAgentRuntime(host_address: str, tracer_provider: TracerProvider | None = None, extra_grpc_config: Sequence[Tuple[str, Any]] | None = None, payload_serialization_format: str = JSON_DATA_CONTENT_TYPE)[source]#
基类:
AgentRuntime
用于运行远程或跨语言代理的代理运行时。
代理消息传递使用来自 agent_worker.proto 的 protobufs 和来自 cloudevent.proto 的
CloudEvent
。跨语言代理还将要求所有代理对代理之间发送的任何消息类型使用共享的 protobuf 模式。
- add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None [source]#
向运行时添加新的消息序列化序列化器
注意:这将基于 type_name 和 data_content_type 属性对序列化器进行重复数据删除
- 参数:
serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – 要添加的序列化器
- async add_subscription(subscription: Subscription) None [source]#
添加一个新的订阅,运行时在处理已发布的消息时应履行该订阅
- 参数:
subscription (Subscription) – 要添加的订阅
- async agent_metadata(agent: AgentId) AgentMetadata [source]#
获取代理的元数据。
- 参数:
agent (AgentId) – 代理 ID。
- 返回:
AgentMetadata – 代理元数据。
- async agent_save_state(agent: AgentId) Mapping[str, Any] [source]#
保存单个代理的状态。
状态的结构是实现定义的,可以是任何 JSON 可序列化的对象。
- 参数:
agent (AgentId) – 代理 ID。
- 返回:
Mapping[str, Any] – 保存的状态。
- async get(id_or_type: AgentId | AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId [source]#
- async load_state(state: Mapping[str, Any]) None [source]#
加载整个运行时的状态,包括所有托管的代理。该状态应与
save_state()
返回的状态相同。- 参数:
state (Mapping[str, Any]) – 已保存的状态。
- async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None [source]#
将消息发布到给定命名空间中的所有代理,或者如果未提供命名空间,则发布到发送者的命名空间。
发布消息后,不应有任何响应。
- 参数:
message (Any) – 要发布的消息。
topic_id (TopicId) – 要将消息发布到的主题。
sender (AgentId | None, optional) – 发送消息的代理。默认为 None。
cancellation_token (CancellationToken | None, optional) – 用于取消正在进行中的操作的令牌。默认为 None。
message_id (str | None, optional) – 消息 ID。如果为 None,将生成新的消息 ID。默认为 None。此消息 ID 必须是唯一的,建议使用 UUID。
- Raises:
UndeliverableException – 如果消息无法传递。
- async register_factory(type: str | AgentType, agent_factory: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None) AgentType [source]#
使用与特定类型关联的运行时注册代理工厂。该类型必须是唯一的。此 API 不添加任何订阅。
注意
这是一个底层 API,通常应该使用 agent 类的 register 方法来代替,因为它也会自动处理订阅。
示例
from dataclasses import dataclass from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event from autogen_core.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def my_agent_factory(): return MyAgent() async def main() -> None: runtime: AgentRuntime = ... # type: ignore await runtime.register_factory("my_agent", lambda: MyAgent()) import asyncio asyncio.run(main())
- 参数:
type (str) – 此工厂创建的 agent 的类型。它与 agent 类名不同。type 参数用于区分不同的工厂函数,而不是 agent 类。
agent_factory (Callable[[], T]) – 创建 agent 的工厂,其中 T 是一个具体的 Agent 类型。在工厂内部,使用 autogen_core.AgentInstantiationContext 来访问诸如当前运行时和 agent ID 之类的变量。
expected_class (type[T] | None, optional) – agent 的预期类,用于运行时的工厂验证。默认为 None。如果为 None,则不执行验证。
- async remove_subscription(id: str) None [source]#
从运行时删除一个订阅
- 参数:
id (str) – 要删除的订阅的 ID
- Raises:
LookupError – 如果订阅不存在
- async save_state() Mapping[str, Any] [source]#
保存整个运行时的状态,包括所有托管的 agent。恢复状态的唯一方法是将其传递给
load_state()
。状态的结构是实现定义的,可以是任何 JSON 可序列化的对象。
- 返回:
Mapping[str, Any] – 保存的状态。
- async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any [source]#
向 agent 发送消息并获取响应。
- 参数:
message (Any) – 要发送的消息。
recipient (AgentId) – 要发送消息的 agent。
sender (AgentId | None, optional) – 发送消息的 Agent。如果消息不是由 agent 发送的,例如直接从外部发送到运行时,则应该仅为 None。默认为 None。
cancellation_token (CancellationToken | None, optional) – 用于取消正在进行的任务的令牌。默认为 None。
- Raises:
CantHandleException – 如果接收者无法处理消息。
UndeliverableException – 如果消息无法传递。
Other – 接收者引发的任何其他异常。
- 返回:
Any – 来自 agent 的响应。
- async stop_when_signal(signals: Sequence[Signals] = (signal.SIGTERM, signal.SIGINT)) None [source]#
在收到信号时停止运行时。
- async try_get_underlying_agent_instance(id: AgentId, type: Type[T] = Agent) T [source]#
尝试通过名称和命名空间获取底层 agent 实例。通常不建议这样做(因此名称很长),但在某些情况下可能有用。
如果无法访问底层 agent,则会引发异常。
- 参数:
id (AgentId) – agent id。
type (Type[T], optional) – agent 的预期类型。默认为 Agent。
- 返回:
T – 具体 agent 实例。
- Raises:
LookupError – 如果未找到 agent。
NotAccessibleError – 如果 agent 不可访问,例如,如果它位于远程位置。
TypeError – 如果 agent 不是预期的类型。
- class GrpcWorkerAgentRuntimeHost(address: str, extra_grpc_config: Sequence[Tuple[str, Any]] | None = None)[source]#
基类:
object
- class GrpcWorkerAgentRuntimeHostServicer[source]#
基类:
AgentRpcServicer
一个 gRPC servicer,用于为代理托管消息传递服务。
- async AddSubscription(request: AddSubscriptionRequest, context: ServicerContext[AddSubscriptionRequest, AddSubscriptionResponse]) AddSubscriptionResponse [source]#
缺少 .proto 文件中的相关文档注释。
- async GetSubscriptions(request: GetSubscriptionsRequest, context: ServicerContext[GetSubscriptionsRequest, GetSubscriptionsResponse]) GetSubscriptionsResponse [source]#
缺少 .proto 文件中的相关文档注释。
- async OpenChannel(request_iterator: AsyncIterator[Message], context: ServicerContext[Message, Message]) AsyncIterator[Message] [source]#
缺少 .proto 文件中的相关文档注释。
- async OpenControlChannel(request_iterator: AsyncIterator[ControlMessage], context: ServicerContext[ControlMessage, ControlMessage]) AsyncIterator[ControlMessage] [source]#
缺少 .proto 文件中的相关文档注释。