autogen_ext.models.cache#

class ChatCompletionCache(client: ChatCompletionClient, store: CacheStore[CreateResult | List[str | CreateResult]] | None = None)[source]#

基类:ChatCompletionClient, Component[ChatCompletionCacheConfig]

封装 ChatCompletionClient 的包装器,该包装器缓存来自底层客户端的创建结果。缓存命中不会增加原始客户端的令牌使用量。

典型用法

以使用磁盘缓存和 openai 客户端为例。首先安装 autogen-ext 和所需的包

pip install -U "autogen-ext[openai, diskcache]"

并按以下方式使用

import asyncio
import tempfile

from autogen_core.models import UserMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.models.cache import ChatCompletionCache, CHAT_CACHE_VALUE_TYPE
from autogen_ext.cache_store.diskcache import DiskCacheStore
from diskcache import Cache


async def main():
    with tempfile.TemporaryDirectory() as tmpdirname:
        # Initialize the original client
        openai_model_client = OpenAIChatCompletionClient(model="gpt-4o")

        # Then initialize the CacheStore, in this case with diskcache.Cache.
        # You can also use redis like:
        # from autogen_ext.cache_store.redis import RedisStore
        # import redis
        # redis_instance = redis.Redis()
        # cache_store = RedisCacheStore[CHAT_CACHE_VALUE_TYPE](redis_instance)
        cache_store = DiskCacheStore[CHAT_CACHE_VALUE_TYPE](Cache(tmpdirname))
        cache_client = ChatCompletionCache(openai_model_client, cache_store)

        response = await cache_client.create([UserMessage(content="Hello, how are you?", source="user")])
        print(response)  # Should print response from OpenAI
        response = await cache_client.create([UserMessage(content="Hello, how are you?", source="user")])
        print(response)  # Should print cached response


asyncio.run(main())

用于 Redis 缓存

import asyncio

from autogen_core.models import UserMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.models.cache import ChatCompletionCache, CHAT_CACHE_VALUE_TYPE
from autogen_ext.cache_store.redis import RedisStore
import redis


async def main():
    # Initialize the original client
    openai_model_client = OpenAIChatCompletionClient(model="gpt-4o")

    # Initialize Redis cache store
    redis_instance = redis.Redis()
    cache_store = RedisStore[CHAT_CACHE_VALUE_TYPE](redis_instance)
    cache_client = ChatCompletionCache(openai_model_client, cache_store)

    response = await cache_client.create([UserMessage(content="Hello, how are you?", source="user")])
    print(response)  # Should print response from OpenAI
    response = await cache_client.create([UserMessage(content="Hello, how are you?", source="user")])
    print(response)  # Should print cached response


asyncio.run(main())

用于 Redis 缓存的流式传输

import asyncio

from autogen_core.models import UserMessage, CreateResult
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.models.cache import ChatCompletionCache, CHAT_CACHE_VALUE_TYPE
from autogen_ext.cache_store.redis import RedisStore
import redis


async def main():
    # Initialize the original client
    openai_model_client = OpenAIChatCompletionClient(model="gpt-4o")

    # Initialize Redis cache store
    redis_instance = redis.Redis()
    cache_store = RedisStore[CHAT_CACHE_VALUE_TYPE](redis_instance)
    cache_client = ChatCompletionCache(openai_model_client, cache_store)

    # First streaming call
    async for chunk in cache_client.create_stream(
        [UserMessage(content="List all countries in Africa", source="user")]
    ):
        if isinstance(chunk, CreateResult):
            print("\n")
            print("Cached: ", chunk.cached)  # Should print False
        else:
            print(chunk, end="")

    # Second streaming call (cached)
    async for chunk in cache_client.create_stream(
        [UserMessage(content="List all countries in Africa", source="user")]
    ):
        if isinstance(chunk, CreateResult):
            print("\n")
            print("Cached: ", chunk.cached)  # Should print True
        else:
            print(chunk, end="")


asyncio.run(main())

您现在可以像使用原始客户端一样使用 cached_client,但已启用缓存。

参数:
  • client (ChatCompletionClient) – 要封装的原始 ChatCompletionClient。

  • store (CacheStore) – 实现 get 和 set 方法的存储对象。用户负责管理存储的生命周期和清除它(如果需要)。默认为使用内存缓存。

component_type: ClassVar[ComponentType] = 'chat_completion_cache'#

组件的逻辑类型。

component_provider_override: ClassVar[str | None] = 'autogen_ext.models.cache.ChatCompletionCache'#

覆盖组件的提供者字符串。这应该用于防止内部模块名称成为模块名称的一部分。

component_config_schema#

别名为 ChatCompletionCacheConfig

async create(messages: Sequence[Annotated[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], *, tools: Sequence[Tool | ToolSchema] = [], tool_choice: Tool | Literal['auto', 'required', 'none'] = 'auto', json_output: bool | type[BaseModel] | None = None, extra_create_args: Mapping[str, Any] = {}, cancellation_token: CancellationToken | None = None) CreateResult[source]#

ChatCompletionClient.create 的缓存版本。如果对 create 的调用结果已被缓存,它将立即返回,而不会调用底层客户端。

注意:cancellation_token 对于缓存结果会被忽略。

create_stream(messages: Sequence[Annotated[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], *, tools: Sequence[Tool | ToolSchema] = [], tool_choice: Tool | Literal['auto', 'required', 'none'] = 'auto', json_output: bool | type[BaseModel] | None = None, extra_create_args: Mapping[str, Any] = {}, cancellation_token: CancellationToken | None = None) AsyncGenerator[str | CreateResult, None][source]#

ChatCompletionClient.create_stream 的缓存版本。如果对 create_stream 的调用结果已被缓存,它将直接返回,而不会从底层客户端进行流式传输。

注意:cancellation_token 对于缓存结果会被忽略。

async close() None[source]#
actual_usage() RequestUsage[source]#
count_tokens(messages: Sequence[Annotated[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], *, tools: Sequence[Tool | ToolSchema] = []) int[source]#
property capabilities: ModelCapabilities#
property model_info: ModelInfo#
remaining_tokens(messages: Sequence[Annotated[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], *, tools: Sequence[Tool | ToolSchema] = []) int[source]#
total_usage() RequestUsage[source]#
_to_config() ChatCompletionCacheConfig[source]#

转储创建与此实例配置匹配的组件新实例所需的配置。

返回:

T – 组件的配置。

classmethod _from_config(config: ChatCompletionCacheConfig) Self[source]#

从配置对象创建组件的新实例。

参数:

config (T) – 配置对象。

返回:

Self – 组件的新实例。