内存和 RAG#
在某些用例中,维护一个有用的事实*存储* 非常有价值,这些事实可以在特定步骤之前智能地添加到代理的上下文中。这里的典型用例是 RAG 模式,其中查询用于从数据库中检索相关信息,然后将其添加到代理的上下文中。
AgentChat 提供了一个 Memory
协议,可以扩展该协议以提供此功能。关键方法是 query
、update_context
、add
、clear
和 close
。
add
:将新条目添加到内存存储query
:从内存存储中检索相关信息update_context
:通过添加检索到的信息来改变代理的内部model_context
(在AssistantAgent
类中使用)clear
:清除内存存储中的所有条目close
:清理内存存储使用的任何资源
ListMemory 示例#
{py:class}~autogen_core.memory.ListMemory 作为 {py:class}~autogen_core.memory.Memory 协议的示例实现提供。它是一个简单的基于列表的内存实现,以时间顺序维护内存,并将最新的内存附加到模型的上下文中。该实现旨在简单明了且可预测,易于理解和调试。在以下示例中,我们将使用 ListMemory 来维护用户偏好的记忆库,并演示如何使用它来为代理响应提供随时间推移的一致的上下文。
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.ui import Console
from autogen_core.memory import ListMemory, MemoryContent, MemoryMimeType
from autogen_ext.models.openai import OpenAIChatCompletionClient
# Initialize user memory
user_memory = ListMemory()
# Add user preferences to memory
await user_memory.add(MemoryContent(content="The weather should be in metric units", mime_type=MemoryMimeType.TEXT))
await user_memory.add(MemoryContent(content="Meal recipe must be vegan", mime_type=MemoryMimeType.TEXT))
async def get_weather(city: str, units: str = "imperial") -> str:
if units == "imperial":
return f"The weather in {city} is 73 °F and Sunny."
elif units == "metric":
return f"The weather in {city} is 23 °C and Sunny."
else:
return f"Sorry, I don't know the weather in {city}."
assistant_agent = AssistantAgent(
name="assistant_agent",
model_client=OpenAIChatCompletionClient(
model="gpt-4o-2024-08-06",
),
tools=[get_weather],
memory=[user_memory],
)
# Run the agent with a task.
stream = assistant_agent.run_stream(task="What is the weather in New York?")
await Console(stream)
我们可以检查到 assistant_agent
model_context 实际上已使用检索到的内存条目进行更新。transform
方法用于将检索到的内存条目格式化为代理可以使用的字符串。在这种情况下,我们只是将每个内存条目的内容连接成一个字符串。
await assistant_agent._model_context.get_messages()
我们在上面看到,天气以摄氏度返回,如用户偏好中所述。
同样,假设我们提出一个关于生成膳食计划的单独问题,代理能够从内存存储中检索相关信息并提供个性化的(纯素)响应。
stream = assistant_agent.run_stream(task="Write brief meal recipe with broth")
await Console(stream)
自定义内存存储(向量数据库等)#
您可以基于 Memory
协议来构建更复杂的内存存储。例如,您可以实现一个自定义内存存储,该存储使用向量数据库来存储和检索信息,或者实现一个内存存储,该存储使用机器学习模型来根据用户的偏好生成个性化的响应等。
具体来说,您需要重载 add
、query
和 update_context
方法以实现所需的功能,并将内存存储传递给您的代理。
目前,以下示例内存存储作为 autogen_ext
扩展包的一部分提供。
autogen_ext.memory.chromadb.ChromaDBVectorMemory
:一个使用向量数据库来存储和检索信息的内存存储。
import os
from pathlib import Path
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.ui import Console
from autogen_core.memory import MemoryContent, MemoryMimeType
from autogen_ext.memory.chromadb import ChromaDBVectorMemory, PersistentChromaDBVectorMemoryConfig
from autogen_ext.models.openai import OpenAIChatCompletionClient
# Initialize ChromaDB memory with custom config
chroma_user_memory = ChromaDBVectorMemory(
config=PersistentChromaDBVectorMemoryConfig(
collection_name="preferences",
persistence_path=os.path.join(str(Path.home()), ".chromadb_autogen"),
k=2, # Return top k results
score_threshold=0.4, # Minimum similarity score
)
)
# a HttpChromaDBVectorMemoryConfig is also supported for connecting to a remote ChromaDB server
# Add user preferences to memory
await chroma_user_memory.add(
MemoryContent(
content="The weather should be in metric units",
mime_type=MemoryMimeType.TEXT,
metadata={"category": "preferences", "type": "units"},
)
)
await chroma_user_memory.add(
MemoryContent(
content="Meal recipe must be vegan",
mime_type=MemoryMimeType.TEXT,
metadata={"category": "preferences", "type": "dietary"},
)
)
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
)
# Create assistant agent with ChromaDB memory
assistant_agent = AssistantAgent(
name="assistant_agent",
model_client=model_client,
tools=[get_weather],
memory=[chroma_user_memory],
)
stream = assistant_agent.run_stream(task="What is the weather in New York?")
await Console(stream)
await model_client.close()
await chroma_user_memory.close()
请注意,您还可以序列化 ChromaDBVectorMemory 并将其保存到磁盘。
chroma_user_memory.dump_component().model_dump_json()
RAG 代理:将所有内容整合在一起#
RAG(检索增强生成)模式在构建 AI 系统中很常见,它包含两个不同的阶段
索引:加载文档,对其进行分块,并将其存储在向量数据库中
检索:在对话运行时查找和使用相关块
在我们之前的示例中,我们手动将项目添加到内存中并将其传递给我们的代理。在实践中,索引过程通常是自动化的,并且基于更大的文档来源,例如产品文档、内部文件或知识库。
注意:RAG 系统的质量取决于分块和检索过程的质量(模型、嵌入等)。您可能需要试验更高级的分块和检索模型才能获得最佳结果。
构建一个简单的 RAG 代理#
首先,让我们创建一个简单的文档索引器,我们将使用它来加载文档,对其进行分块,并将其存储在 ChromaDBVectorMemory
内存存储中。
import re
from typing import List
import aiofiles
import aiohttp
from autogen_core.memory import Memory, MemoryContent, MemoryMimeType
class SimpleDocumentIndexer:
"""Basic document indexer for AutoGen Memory."""
def __init__(self, memory: Memory, chunk_size: int = 1500) -> None:
self.memory = memory
self.chunk_size = chunk_size
async def _fetch_content(self, source: str) -> str:
"""Fetch content from URL or file."""
if source.startswith(("http://", "https://")):
async with aiohttp.ClientSession() as session:
async with session.get(source) as response:
return await response.text()
else:
async with aiofiles.open(source, "r", encoding="utf-8") as f:
return await f.read()
def _strip_html(self, text: str) -> str:
"""Remove HTML tags and normalize whitespace."""
text = re.sub(r"<[^>]*>", " ", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
def _split_text(self, text: str) -> List[str]:
"""Split text into fixed-size chunks."""
chunks: list[str] = []
# Just split text into fixed-size chunks
for i in range(0, len(text), self.chunk_size):
chunk = text[i : i + self.chunk_size]
chunks.append(chunk.strip())
return chunks
async def index_documents(self, sources: List[str]) -> int:
"""Index documents into memory."""
total_chunks = 0
for source in sources:
try:
content = await self._fetch_content(source)
# Strip HTML if content appears to be HTML
if "<" in content and ">" in content:
content = self._strip_html(content)
chunks = self._split_text(content)
for i, chunk in enumerate(chunks):
await self.memory.add(
MemoryContent(
content=chunk, mime_type=MemoryMimeType.TEXT, metadata={"source": source, "chunk_index": i}
)
)
total_chunks += len(chunks)
except Exception as e:
print(f"Error indexing {source}: {str(e)}")
return total_chunks
现在让我们将我们的索引器与 ChromaDBVectorMemory 一起使用来构建一个完整的 RAG 代理
import os
from pathlib import Path
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.ui import Console
from autogen_ext.memory.chromadb import ChromaDBVectorMemory, PersistentChromaDBVectorMemoryConfig
from autogen_ext.models.openai import OpenAIChatCompletionClient
# Initialize vector memory
rag_memory = ChromaDBVectorMemory(
config=PersistentChromaDBVectorMemoryConfig(
collection_name="autogen_docs",
persistence_path=os.path.join(str(Path.home()), ".chromadb_autogen"),
k=3, # Return top 3 results
score_threshold=0.4, # Minimum similarity score
)
)
await rag_memory.clear() # Clear existing memory
# Index AutoGen documentation
async def index_autogen_docs() -> None:
indexer = SimpleDocumentIndexer(memory=rag_memory)
sources = [
"https://raw.githubusercontent.com/microsoft/autogen/main/README.md",
"https://msdocs.cn/autogen/stable/user-guide/agentchat-user-guide/tutorial/agents.html",
"https://msdocs.cn/autogen/stable/user-guide/agentchat-user-guide/tutorial/teams.html",
"https://msdocs.cn/autogen/stable/user-guide/agentchat-user-guide/tutorial/termination.html",
]
chunks: int = await indexer.index_documents(sources)
print(f"Indexed {chunks} chunks from {len(sources)} AutoGen documents")
await index_autogen_docs()
Indexed 72 chunks from 4 AutoGen documents
# Create our RAG assistant agent
rag_assistant = AssistantAgent(
name="rag_assistant", model_client=OpenAIChatCompletionClient(model="gpt-4o"), memory=[rag_memory]
)
# Ask questions about AutoGen
stream = rag_assistant.run_stream(task="What is AgentChat?")
await Console(stream)
# Remember to close the memory when done
await rag_memory.close()
---------- user ----------
What is AgentChat?
Query results: results=[MemoryContent(content='ng OpenAI\'s GPT-4o model. See [other supported models](https://msdocs.cn/autogen/stable/user-guide/agentchat-user-guide/tutorial/models.html). ```python import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent("assistant", model_client=model_client) print(await agent.run(task="Say \'Hello World!\'")) await model_client.close() asyncio.run(main()) ``` ### Web Browsing Agent Team Create a group chat team with a web surfer agent and a user proxy agent for web browsing tasks. You need to install [playwright](https://playwright.net.cn/python/docs/library). ```python # pip install -U autogen-agentchat autogen-ext[openai,web-surfer] # playwright install import asyncio from autogen_agentchat.agents import UserProxyAgent from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_ext.agents.web_surfer import MultimodalWebSurfer async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") # The web surfer will open a Chromium browser window to perform web browsing tasks. web_surfer = MultimodalWebSurfer("web_surfer", model_client, headless=False, animate_actions=True) # The user proxy agent is used to ge', mime_type='MemoryMimeType.TEXT', metadata={'chunk_index': 1, 'mime_type': 'MemoryMimeType.TEXT', 'source': 'https://raw.githubusercontent.com/microsoft/autogen/main/README.md', 'score': 0.48810458183288574, 'id': '16088e03-0153-4da3-9dec-643b39c549f5'}), MemoryContent(content='els_usage=None content='AutoGen is a programming framework for building multi-agent applications.' type='ToolCallSummaryMessage' The call to the on_messages() method returns a Response that contains the agent’s final response in the chat_message attribute, as well as a list of inner messages in the inner_messages attribute, which stores the agent’s “thought process” that led to the final response. Note It is important to note that on_messages() will update the internal state of the agent – it will add the messages to the agent’s history. So you should call this method with new messages. You should not repeatedly call this method with the same messages or the complete history. Note Unlike in v0.2 AgentChat, the tools are executed by the same agent directly within the same call to on_messages() . By default, the agent will return the result of the tool call as the final response. You can also call the run() method, which is a convenience method that calls on_messages() . It follows the same interface as Teams and returns a TaskResult object. Multi-Modal Input # The AssistantAgent can handle multi-modal input by providing the input as a MultiModalMessage . from io import BytesIO import PIL import requests from autogen_agentchat.messages import MultiModalMessage from autogen_core import Image # Create a multi-modal message with random image and text. pil_image = PIL . Image . open ( BytesIO ( requests . get ( "https://picsum.photos/300/200" ) . content )', mime_type='MemoryMimeType.TEXT', metadata={'chunk_index': 3, 'mime_type': 'MemoryMimeType.TEXT', 'source': 'https://msdocs.cn/autogen/stable/user-guide/agentchat-user-guide/tutorial/agents.html', 'score': 0.4665141701698303, 'id': '3d603b62-7cab-4f74-b671-586fe36306f2'}), MemoryContent(content='AgentChat Termination Termination # In the previous section, we explored how to define agents, and organize them into teams that can solve tasks. However, a run can go on forever, and in many cases, we need to know when to stop them. This is the role of the termination condition. AgentChat supports several termination condition by providing a base TerminationCondition class and several implementations that inherit from it. A termination condition is a callable that takes a sequence of BaseAgentEvent or BaseChatMessage objects since the last time the condition was called , and returns a StopMessage if the conversation should be terminated, or None otherwise. Once a termination condition has been reached, it must be reset by calling reset() before it can be used again. Some important things to note about termination conditions: They are stateful but reset automatically after each run ( run() or run_stream() ) is finished. They can be combined using the AND and OR operators. Note For group chat teams (i.e., RoundRobinGroupChat , SelectorGroupChat , and Swarm ), the termination condition is called after each agent responds. While a response may contain multiple inner messages, the team calls its termination condition just once for all the messages from a single response. So the condition is called with the “delta sequence” of messages since the last time it was called. Built-In Termination Conditions: MaxMessageTermination : Stops after a specified number of messages have been produced,', mime_type='MemoryMimeType.TEXT', metadata={'chunk_index': 1, 'mime_type': 'MemoryMimeType.TEXT', 'source': 'https://msdocs.cn/autogen/stable/user-guide/agentchat-user-guide/tutorial/termination.html', 'score': 0.461774212772051, 'id': '699ef490-d108-4cd3-b629-c1198d6b78ba'})]
---------- rag_assistant ----------
[MemoryContent(content='ng OpenAI\'s GPT-4o model. See [other supported models](https://msdocs.cn/autogen/stable/user-guide/agentchat-user-guide/tutorial/models.html). ```python import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent("assistant", model_client=model_client) print(await agent.run(task="Say \'Hello World!\'")) await model_client.close() asyncio.run(main()) ``` ### Web Browsing Agent Team Create a group chat team with a web surfer agent and a user proxy agent for web browsing tasks. You need to install [playwright](https://playwright.net.cn/python/docs/library). ```python # pip install -U autogen-agentchat autogen-ext[openai,web-surfer] # playwright install import asyncio from autogen_agentchat.agents import UserProxyAgent from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_ext.agents.web_surfer import MultimodalWebSurfer async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") # The web surfer will open a Chromium browser window to perform web browsing tasks. web_surfer = MultimodalWebSurfer("web_surfer", model_client, headless=False, animate_actions=True) # The user proxy agent is used to ge', mime_type='MemoryMimeType.TEXT', metadata={'chunk_index': 1, 'mime_type': 'MemoryMimeType.TEXT', 'source': 'https://raw.githubusercontent.com/microsoft/autogen/main/README.md', 'score': 0.48810458183288574, 'id': '16088e03-0153-4da3-9dec-643b39c549f5'}), MemoryContent(content='els_usage=None content='AutoGen is a programming framework for building multi-agent applications.' type='ToolCallSummaryMessage' The call to the on_messages() method returns a Response that contains the agent’s final response in the chat_message attribute, as well as a list of inner messages in the inner_messages attribute, which stores the agent’s “thought process” that led to the final response. Note It is important to note that on_messages() will update the internal state of the agent – it will add the messages to the agent’s history. So you should call this method with new messages. You should not repeatedly call this method with the same messages or the complete history. Note Unlike in v0.2 AgentChat, the tools are executed by the same agent directly within the same call to on_messages() . By default, the agent will return the result of the tool call as the final response. You can also call the run() method, which is a convenience method that calls on_messages() . It follows the same interface as Teams and returns a TaskResult object. Multi-Modal Input # The AssistantAgent can handle multi-modal input by providing the input as a MultiModalMessage . from io import BytesIO import PIL import requests from autogen_agentchat.messages import MultiModalMessage from autogen_core import Image # Create a multi-modal message with random image and text. pil_image = PIL . Image . open ( BytesIO ( requests . get ( "https://picsum.photos/300/200" ) . content )', mime_type='MemoryMimeType.TEXT', metadata={'chunk_index': 3, 'mime_type': 'MemoryMimeType.TEXT', 'source': 'https://msdocs.cn/autogen/stable/user-guide/agentchat-user-guide/tutorial/agents.html', 'score': 0.4665141701698303, 'id': '3d603b62-7cab-4f74-b671-586fe36306f2'}), MemoryContent(content='AgentChat Termination Termination # In the previous section, we explored how to define agents, and organize them into teams that can solve tasks. However, a run can go on forever, and in many cases, we need to know when to stop them. This is the role of the termination condition. AgentChat supports several termination condition by providing a base TerminationCondition class and several implementations that inherit from it. A termination condition is a callable that takes a sequenceBaseChatMessageent or BaseChatMessage objects since the last time the condition was called , and returns a StopMessage if the conversation should be terminated, or None otherwise. Once a termination condition has been reached, it must be reset by calling reset() before it can be used again. Some important things to note about termination conditions: They are stateful but reset automatically after each run ( run() or run_stream() ) is finished. They can be combined using the AND and OR operators. Note For group chat teams (i.e., RoundRobinGroupChat , SelectorGroupChat , and Swarm ), the termination condition is called after each agent responds. While a response may contain multiple inner messages, the team calls its termination condition just once for all the messages from a single response. So the condition is called with the “delta sequence” of messages since the last time it was called. Built-In Termination Conditions: MaxMessageTermination : Stops after a specified number of messages have been produced,', mime_type='MemoryMimeType.TEXT', metadata={'chunk_index': 1, 'mime_type': 'MemoryMimeType.TEXT', 'source': 'https://msdocs.cn/autogen/stable/user-guide/agentchat-user-guide/tutorial/termination.html', 'score': 0.461774212772051, 'id': '699ef490-d108-4cd3-b629-c1198d6b78ba'})]
---------- rag_assistant ----------
AgentChat is part of the AutoGen framework, a programming environment for building multi-agent applications. In AgentChat, agents can interact with each other and with users to perform various tasks, including web browsing and engaging in dialogue. It utilizes models from OpenAI for chat completions and supports multi-modal input, which means agents can handle inputs that include both text and images. Additionally, AgentChat provides mechanisms to define termination conditions to control when a conversation or task should be concluded, ensuring that the agent interactions are efficient and goal-oriented. TERMINATE
此实现提供了一个 RAG 代理,可以根据 AutoGen 文档回答问题。当提出问题时,内存系统会检索相关块并将其添加到上下文中,从而使助手能够生成知情的响应。
对于生产系统,您可能需要
实施更复杂的分块策略
添加元数据过滤功能
自定义检索评分
为您的特定领域优化嵌入模型