autogen_ext.models.cache#
- class ChatCompletionCache(client: ChatCompletionClient, store: CacheStore[CreateResult | List[str | CreateResult]] | None = None)[source]#
基类:
ChatCompletionClient,Component[ChatCompletionCacheConfig]封装
ChatCompletionClient的包装器,该包装器缓存来自底层客户端的创建结果。缓存命中不会增加原始客户端的令牌使用量。典型用法
以使用磁盘缓存和 openai 客户端为例。首先安装 autogen-ext 和所需的包
pip install -U "autogen-ext[openai, diskcache]"
并按以下方式使用
import asyncio import tempfile from autogen_core.models import UserMessage from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_ext.models.cache import ChatCompletionCache, CHAT_CACHE_VALUE_TYPE from autogen_ext.cache_store.diskcache import DiskCacheStore from diskcache import Cache async def main(): with tempfile.TemporaryDirectory() as tmpdirname: # Initialize the original client openai_model_client = OpenAIChatCompletionClient(model="gpt-4o") # Then initialize the CacheStore, in this case with diskcache.Cache. # You can also use redis like: # from autogen_ext.cache_store.redis import RedisStore # import redis # redis_instance = redis.Redis() # cache_store = RedisCacheStore[CHAT_CACHE_VALUE_TYPE](redis_instance) cache_store = DiskCacheStore[CHAT_CACHE_VALUE_TYPE](Cache(tmpdirname)) cache_client = ChatCompletionCache(openai_model_client, cache_store) response = await cache_client.create([UserMessage(content="Hello, how are you?", source="user")]) print(response) # Should print response from OpenAI response = await cache_client.create([UserMessage(content="Hello, how are you?", source="user")]) print(response) # Should print cached response asyncio.run(main())
用于 Redis 缓存
import asyncio from autogen_core.models import UserMessage from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_ext.models.cache import ChatCompletionCache, CHAT_CACHE_VALUE_TYPE from autogen_ext.cache_store.redis import RedisStore import redis async def main(): # Initialize the original client openai_model_client = OpenAIChatCompletionClient(model="gpt-4o") # Initialize Redis cache store redis_instance = redis.Redis() cache_store = RedisStore[CHAT_CACHE_VALUE_TYPE](redis_instance) cache_client = ChatCompletionCache(openai_model_client, cache_store) response = await cache_client.create([UserMessage(content="Hello, how are you?", source="user")]) print(response) # Should print response from OpenAI response = await cache_client.create([UserMessage(content="Hello, how are you?", source="user")]) print(response) # Should print cached response asyncio.run(main())
用于 Redis 缓存的流式传输
import asyncio from autogen_core.models import UserMessage, CreateResult from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_ext.models.cache import ChatCompletionCache, CHAT_CACHE_VALUE_TYPE from autogen_ext.cache_store.redis import RedisStore import redis async def main(): # Initialize the original client openai_model_client = OpenAIChatCompletionClient(model="gpt-4o") # Initialize Redis cache store redis_instance = redis.Redis() cache_store = RedisStore[CHAT_CACHE_VALUE_TYPE](redis_instance) cache_client = ChatCompletionCache(openai_model_client, cache_store) # First streaming call async for chunk in cache_client.create_stream( [UserMessage(content="List all countries in Africa", source="user")] ): if isinstance(chunk, CreateResult): print("\n") print("Cached: ", chunk.cached) # Should print False else: print(chunk, end="") # Second streaming call (cached) async for chunk in cache_client.create_stream( [UserMessage(content="List all countries in Africa", source="user")] ): if isinstance(chunk, CreateResult): print("\n") print("Cached: ", chunk.cached) # Should print True else: print(chunk, end="") asyncio.run(main())
您现在可以像使用原始客户端一样使用 cached_client,但已启用缓存。
- 参数:
client (ChatCompletionClient) – 要封装的原始 ChatCompletionClient。
store (CacheStore) – 实现 get 和 set 方法的存储对象。用户负责管理存储的生命周期和清除它(如果需要)。默认为使用内存缓存。
- component_type: ClassVar[ComponentType] = 'chat_completion_cache'#
组件的逻辑类型。
- component_provider_override: ClassVar[str | None] = 'autogen_ext.models.cache.ChatCompletionCache'#
覆盖组件的提供者字符串。这应该用于防止内部模块名称成为模块名称的一部分。
- component_config_schema#
别名为
ChatCompletionCacheConfig
- async create(messages: Sequence[Annotated[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], *, tools: Sequence[Tool | ToolSchema] = [], tool_choice: Tool | Literal['auto', 'required', 'none'] = 'auto', json_output: bool | type[BaseModel] | None = None, extra_create_args: Mapping[str, Any] = {}, cancellation_token: CancellationToken | None = None) CreateResult[source]#
ChatCompletionClient.create 的缓存版本。如果对 create 的调用结果已被缓存,它将立即返回,而不会调用底层客户端。
注意:cancellation_token 对于缓存结果会被忽略。
- create_stream(messages: Sequence[Annotated[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], *, tools: Sequence[Tool | ToolSchema] = [], tool_choice: Tool | Literal['auto', 'required', 'none'] = 'auto', json_output: bool | type[BaseModel] | None = None, extra_create_args: Mapping[str, Any] = {}, cancellation_token: CancellationToken | None = None) AsyncGenerator[str | CreateResult, None][source]#
ChatCompletionClient.create_stream 的缓存版本。如果对 create_stream 的调用结果已被缓存,它将直接返回,而不会从底层客户端进行流式传输。
注意:cancellation_token 对于缓存结果会被忽略。
- actual_usage() RequestUsage[source]#
- count_tokens(messages: Sequence[Annotated[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], *, tools: Sequence[Tool | ToolSchema] = []) int[source]#
- property capabilities: ModelCapabilities#
- remaining_tokens(messages: Sequence[Annotated[SystemMessage | UserMessage | AssistantMessage | FunctionExecutionResultMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], *, tools: Sequence[Tool | ToolSchema] = []) int[source]#
- total_usage() RequestUsage[source]#