OpenAI Assistant Agent#
Open AI Assistant 和 Azure OpenAI Assistant 是用于构建 Agent 的服务器端 API。 它们可以用于在 AutoGen 中构建 Agent。 本实用指南演示了如何使用 OpenAI Assistant 创建一个可以运行代码并在文档上进行问答的 Agent。
消息协议#
首先,我们需要为由 OpenAI Assistant 支持的 Agent 指定消息协议。 消息协议定义了 Agent 处理和发布的消息的结构。 为了说明,我们定义了一个包含 4 种消息类型的简单消息协议:Message
、Reset
、UploadForCodeInterpreter
和 UploadForFileSearch
。
from dataclasses import dataclass
@dataclass
class TextMessage:
content: str
source: str
@dataclass
class Reset:
pass
@dataclass
class UploadForCodeInterpreter:
file_path: str
@dataclass
class UploadForFileSearch:
file_path: str
vector_store_id: str
TextMessage
消息类型用于与 Agent 通信。 它有一个 content
字段,包含消息内容,以及一个 source
字段,用于指示发送者。Reset
消息类型是一种控制消息,用于重置 Agent 的内存。 它没有字段。 当我们需要与 Agent 开始新的对话时,这很有用。
UploadForCodeInterpreter
消息类型用于为代码解释器上传数据文件,而 UploadForFileSearch
消息类型用于为文件搜索上传文档。 两种消息类型都具有一个 file_path
字段,其中包含要上传的文件的本地路径。
定义 Agent#
接下来,我们定义 Agent 类。 Agent 类构造函数具有以下参数:description
、client
、assistant_id
、thread_id
和 assistant_event_handler_factory
。client
参数是 OpenAI 异步客户端对象,assistant_event_handler_factory
用于创建助理事件处理程序以处理 OpenAI Assistant 事件。 这可以用于从助理创建流式输出。
Agent 类具有以下消息处理程序:
handle_message
: 处理TextMessage
消息类型,并从助理发回响应。handle_reset
: 处理Reset
消息类型,并重置助理 Agent 的内存。handle_upload_for_code_interpreter
: 处理UploadForCodeInterpreter
消息类型,并将文件上传到代码解释器。handle_upload_for_file_search
: 处理UploadForFileSearch
消息类型,并将文档上传到文件搜索。
助理的内存存储在线程中,该线程保存在服务器端。 该线程由 thread_id
参数引用。
import asyncio
import os
from typing import Any, Callable, List
import aiofiles
from autogen_core import AgentId, MessageContext, RoutedAgent, message_handler
from openai import AsyncAssistantEventHandler, AsyncClient
from openai.types.beta.thread import ToolResources, ToolResourcesFileSearch
class OpenAIAssistantAgent(RoutedAgent):
"""An agent implementation that uses the OpenAI Assistant API to generate
responses.
Args:
description (str): The description of the agent.
client (openai.AsyncClient): The client to use for the OpenAI API.
assistant_id (str): The assistant ID to use for the OpenAI API.
thread_id (str): The thread ID to use for the OpenAI API.
assistant_event_handler_factory (Callable[[], AsyncAssistantEventHandler], optional):
A factory function to create an async assistant event handler. Defaults to None.
If provided, the agent will use the streaming mode with the event handler.
If not provided, the agent will use the blocking mode to generate responses.
"""
def __init__(
self,
description: str,
client: AsyncClient,
assistant_id: str,
thread_id: str,
assistant_event_handler_factory: Callable[[], AsyncAssistantEventHandler],
) -> None:
super().__init__(description)
self._client = client
self._assistant_id = assistant_id
self._thread_id = thread_id
self._assistant_event_handler_factory = assistant_event_handler_factory
@message_handler
async def handle_message(self, message: TextMessage, ctx: MessageContext) -> TextMessage:
"""Handle a message. This method adds the message to the thread and publishes a response."""
# Save the message to the thread.
await ctx.cancellation_token.link_future(
asyncio.ensure_future(
self._client.beta.threads.messages.create(
thread_id=self._thread_id,
content=message.content,
role="user",
metadata={"sender": message.source},
)
)
)
# Generate a response.
async with self._client.beta.threads.runs.stream(
thread_id=self._thread_id,
assistant_id=self._assistant_id,
event_handler=self._assistant_event_handler_factory(),
) as stream:
await ctx.cancellation_token.link_future(asyncio.ensure_future(stream.until_done()))
# Get the last message.
messages = await ctx.cancellation_token.link_future(
asyncio.ensure_future(self._client.beta.threads.messages.list(self._thread_id, order="desc", limit=1))
)
last_message_content = messages.data[0].content
# Get the text content from the last message.
text_content = [content for content in last_message_content if content.type == "text"]
if not text_content:
raise ValueError(f"Expected text content in the last message: {last_message_content}")
return TextMessage(content=text_content[0].text.value, source=self.metadata["type"])
@message_handler()
async def on_reset(self, message: Reset, ctx: MessageContext) -> None:
"""Handle a reset message. This method deletes all messages in the thread."""
# Get all messages in this thread.
all_msgs: List[str] = []
while True:
if not all_msgs:
msgs = await ctx.cancellation_token.link_future(
asyncio.ensure_future(self._client.beta.threads.messages.list(self._thread_id))
)
else:
msgs = await ctx.cancellation_token.link_future(
asyncio.ensure_future(self._client.beta.threads.messages.list(self._thread_id, after=all_msgs[-1]))
)
for msg in msgs.data:
all_msgs.append(msg.id)
if not msgs.has_next_page():
break
# Delete all the messages.
for msg_id in all_msgs:
status = await ctx.cancellation_token.link_future(
asyncio.ensure_future(
self._client.beta.threads.messages.delete(message_id=msg_id, thread_id=self._thread_id)
)
)
assert status.deleted is True
@message_handler()
async def on_upload_for_code_interpreter(self, message: UploadForCodeInterpreter, ctx: MessageContext) -> None:
"""Handle an upload for code interpreter. This method uploads a file and updates the thread with the file."""
# Get the file content.
async with aiofiles.open(message.file_path, mode="rb") as f:
file_content = await ctx.cancellation_token.link_future(asyncio.ensure_future(f.read()))
file_name = os.path.basename(message.file_path)
# Upload the file.
file = await ctx.cancellation_token.link_future(
asyncio.ensure_future(self._client.files.create(file=(file_name, file_content), purpose="assistants"))
)
# Get existing file ids from tool resources.
thread = await ctx.cancellation_token.link_future(
asyncio.ensure_future(self._client.beta.threads.retrieve(thread_id=self._thread_id))
)
tool_resources: ToolResources = thread.tool_resources if thread.tool_resources else ToolResources()
assert tool_resources.code_interpreter is not None
if tool_resources.code_interpreter.file_ids:
file_ids = tool_resources.code_interpreter.file_ids
else:
file_ids = [file.id]
# Update thread with new file.
await ctx.cancellation_token.link_future(
asyncio.ensure_future(
self._client.beta.threads.update(
thread_id=self._thread_id,
tool_resources={
"code_interpreter": {"file_ids": file_ids},
},
)
)
)
@message_handler()
async def on_upload_for_file_search(self, message: UploadForFileSearch, ctx: MessageContext) -> None:
"""Handle an upload for file search. This method uploads a file and updates the vector store."""
# Get the file content.
async with aiofiles.open(message.file_path, mode="rb") as file:
file_content = await ctx.cancellation_token.link_future(asyncio.ensure_future(file.read()))
file_name = os.path.basename(message.file_path)
# Upload the file.
await ctx.cancellation_token.link_future(
asyncio.ensure_future(
self._client.vector_stores.file_batches.upload_and_poll(
vector_store_id=message.vector_store_id,
files=[(file_name, file_content)],
)
)
)
Agent 类是 OpenAI Assistant API 的一个薄封装,用于实现消息协议。 可以通过扩展消息协议来添加更多功能,例如多模式消息处理。
助理事件处理程序#
助理事件处理程序提供回调来处理 Assistant API 的特定事件。 这对于处理来自助理的流式输出以及进一步的用户界面集成非常有用。
from openai import AsyncAssistantEventHandler, AsyncClient
from openai.types.beta.threads import Message, Text, TextDelta
from openai.types.beta.threads.runs import RunStep, RunStepDelta
from typing_extensions import override
class EventHandler(AsyncAssistantEventHandler):
@override
async def on_text_delta(self, delta: TextDelta, snapshot: Text) -> None:
print(delta.value, end="", flush=True)
@override
async def on_run_step_created(self, run_step: RunStep) -> None:
details = run_step.step_details
if details.type == "tool_calls":
for tool in details.tool_calls:
if tool.type == "code_interpreter":
print("\nGenerating code to interpret:\n\n```python")
@override
async def on_run_step_done(self, run_step: RunStep) -> None:
details = run_step.step_details
if details.type == "tool_calls":
for tool in details.tool_calls:
if tool.type == "code_interpreter":
print("\n```\nExecuting code...")
@override
async def on_run_step_delta(self, delta: RunStepDelta, snapshot: RunStep) -> None:
details = delta.step_details
if details is not None and details.type == "tool_calls":
for tool in details.tool_calls or []:
if tool.type == "code_interpreter" and tool.code_interpreter and tool.code_interpreter.input:
print(tool.code_interpreter.input, end="", flush=True)
@override
async def on_message_created(self, message: Message) -> None:
print(f"{'-'*80}\nAssistant:\n")
@override
async def on_message_done(self, message: Message) -> None:
# print a citation to the file searched
if not message.content:
return
content = message.content[0]
if not content.type == "text":
return
text_content = content.text
annotations = text_content.annotations
citations: List[str] = []
for index, annotation in enumerate(annotations):
text_content.value = text_content.value.replace(annotation.text, f"[{index}]")
if file_citation := getattr(annotation, "file_citation", None):
client = AsyncClient()
cited_file = await client.files.retrieve(file_citation.file_id)
citations.append(f"[{index}] {cited_file.filename}")
if citations:
print("\n".join(citations))
使用 Agent#
首先,我们需要使用 openai
客户端来创建实际的助理、线程和向量存储。 我们的 AutoGen Agent 将使用这些。
import openai
# Create an assistant with code interpreter and file search tools.
oai_assistant = openai.beta.assistants.create(
model="gpt-4o-mini",
description="An AI assistant that helps with everyday tasks.",
instructions="Help the user with their task.",
tools=[{"type": "code_interpreter"}, {"type": "file_search"}],
)
# Create a vector store to be used for file search.
vector_store = openai.vector_stores.create()
# Create a thread which is used as the memory for the assistant.
thread = openai.beta.threads.create(
tool_resources={"file_search": {"vector_store_ids": [vector_store.id]}},
)
然后,我们创建一个运行时,并在运行时中为此 Agent 注册一个 Agent 工厂函数。
from autogen_core import SingleThreadedAgentRuntime
runtime = SingleThreadedAgentRuntime()
await OpenAIAssistantAgent.register(
runtime,
"assistant",
lambda: OpenAIAssistantAgent(
description="OpenAI Assistant Agent",
client=openai.AsyncClient(),
assistant_id=oai_assistant.id,
thread_id=thread.id,
assistant_event_handler_factory=lambda: EventHandler(),
),
)
agent = AgentId("assistant", "default")
让我们打开日志记录,看看幕后发生了什么。
import logging
logging.basicConfig(level=logging.WARNING)
logging.getLogger("autogen_core").setLevel(logging.DEBUG)
让我们向 Agent 发送一条问候消息,并查看流式返回的响应。
runtime.start()
await runtime.send_message(TextMessage(content="Hello, how are you today!", source="user"), agent)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type TextMessage to assistant: {'content': 'Hello, how are you today!', 'source': 'user'}
INFO:autogen_core:Calling message handler for assistant:default with message type TextMessage sent by Unknown
--------------------------------------------------------------------------------
Assistant:
Hello! I'm here and ready to assist you. How can I help you today?
INFO:autogen_core:Resolving response with message type TextMessage for recipient None from assistant: {'content': "Hello! I'm here and ready to assist you. How can I help you today?", 'source': 'assistant'}
带有代码解释器的助理#
让我们向 Agent 提出一些数学问题,看看它是否使用代码解释器来回答问题。
runtime.start()
await runtime.send_message(TextMessage(content="What is 1332322 x 123212?", source="user"), agent)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type TextMessage to assistant: {'content': 'What is 1332322 x 123212?', 'source': 'user'}
INFO:autogen_core:Calling message handler for assistant:default with message type TextMessage sent by Unknown
# Calculating the product of 1332322 and 123212
result = 1332322 * 123212
result
```
Executing code...
--------------------------------------------------------------------------------
Assistant:
The product of 1,332,322 and 123,212 is 164,158,058,264.
INFO:autogen_core:Resolving response with message type TextMessage for recipient None from assistant: {'content': 'The product of 1,332,322 and 123,212 is 164,158,058,264.', 'source': 'assistant'}
让我们从 Seattle Open Data 门户获取一些数据。 我们将使用 City of Seattle Wage Data。 让我们先下载它。
import requests
response = requests.get("https://data.seattle.gov/resource/2khk-5ukd.csv")
with open("seattle_city_wages.csv", "wb") as file:
file.write(response.content)
让我们使用 UploadForCodeInterpreter
消息将文件发送到 Agent。
runtime.start()
await runtime.send_message(UploadForCodeInterpreter(file_path="seattle_city_wages.csv"), agent)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type UploadForCodeInterpreter to assistant: {'file_path': 'seattle_city_wages.csv'}
INFO:autogen_core:Calling message handler for assistant:default with message type UploadForCodeInterpreter sent by Unknown
INFO:autogen_core:Resolving response with message type NoneType for recipient None from assistant: None
我们现在可以向 Agent 提出一些关于数据的问题。
runtime.start()
await runtime.send_message(TextMessage(content="Take a look at the uploaded CSV file.", source="user"), agent)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type TextMessage to assistant: {'content': 'Take a look at the uploaded CSV file.', 'source': 'user'}
INFO:autogen_core:Calling message handler for assistant:default with message type TextMessage sent by Unknown
import pandas as pd
# Load the uploaded CSV file to examine its contents
file_path = '/mnt/data/file-oEvRiyGyHc2jZViKyDqL8aoh'
csv_data = pd.read_csv(file_path)
# Display the first few rows of the dataframe to understand its structure
csv_data.head()
```
Executing code...
--------------------------------------------------------------------------------
Assistant:
The uploaded CSV file contains the following columns:
1. **department**: The department in which the individual works.
2. **last_name**: The last name of the employee.
3. **first_name**: The first name of the employee.
4. **job_title**: The job title of the employee.
5. **hourly_rate**: The hourly rate for the employee's position.
Here are the first few entries from the file:
| department | last_name | first_name | job_title | hourly_rate |
|--------------------------------|-----------|------------|------------------------------------|-------------|
| Police Department | Aagard | Lori | Pol Capt-Precinct | 112.70 |
| Police Department | Aakervik | Dag | Pol Ofcr-Detective | 75.61 |
| Seattle City Light | Aaltonen | Evan | Pwrline Clear Tree Trimmer | 53.06 |
| Seattle Public Utilities | Aar | Abdimallik | Civil Engrng Spec,Sr | 64.43 |
| Seattle Dept of Transportation | Abad | Abigail | Admin Spec II-BU | 37.40 |
If you need any specific analysis or information from this data, please let me know!
INFO:autogen_core:Resolving response with message type TextMessage for recipient None from assistant: {'content': "The uploaded CSV file contains the following columns:\n\n1. **department**: The department in which the individual works.\n2. **last_name**: The last name of the employee.\n3. **first_name**: The first name of the employee.\n4. **job_title**: The job title of the employee.\n5. **hourly_rate**: The hourly rate for the employee's position.\n\nHere are the first few entries from the file:\n\n| department | last_name | first_name | job_title | hourly_rate |\n|--------------------------------|-----------|------------|------------------------------------|-------------|\n| Police Department | Aagard | Lori | Pol Capt-Precinct | 112.70 |\n| Police Department | Aakervik | Dag | Pol Ofcr-Detective | 75.61 |\n| Seattle City Light | Aaltonen | Evan | Pwrline Clear Tree Trimmer | 53.06 |\n| Seattle Public Utilities | Aar | Abdimallik | Civil Engrng Spec,Sr | 64.43 |\n| Seattle Dept of Transportation | Abad | Abigail | Admin Spec II-BU | 37.40 |\n\nIf you need any specific analysis or information from this data, please let me know!", 'source': 'assistant'}
runtime.start()
await runtime.send_message(TextMessage(content="What are the top-10 salaries?", source="user"), agent)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type TextMessage to assistant: {'content': 'What are the top-10 salaries?', 'source': 'user'}
INFO:autogen_core:Calling message handler for assistant:default with message type TextMessage sent by Unknown
# Sorting the data by hourly_rate in descending order and selecting the top 10 salaries
top_10_salaries = csv_data[['first_name', 'last_name', 'job_title', 'hourly_rate']].sort_values(by='hourly_rate', ascending=False).head(10)
top_10_salaries.reset_index(drop=True, inplace=True)
top_10_salaries
```
Executing code...
--------------------------------------------------------------------------------
Assistant:
Here are the top 10 salaries based on the hourly rates from the CSV file:
| First Name | Last Name | Job Title | Hourly Rate |
|------------|-----------|------------------------------------|-------------|
| Eric | Barden | Executive4 | 139.61 |
| Idris | Beauregard| Executive3 | 115.90 |
| Lori | Aagard | Pol Capt-Precinct | 112.70 |
| Krista | Bair | Pol Capt-Precinct | 108.74 |
| Amy | Bannister | Fire Chief, Dep Adm-80 Hrs | 104.07 |
| Ginger | Armbruster| Executive2 | 102.42 |
| William | Andersen | Executive2 | 102.42 |
| Valarie | Anderson | Executive2 | 102.42 |
| Paige | Alderete | Executive2 | 102.42 |
| Kathryn | Aisenberg | Executive2 | 100.65 |
If you need any further details or analysis, let me know!
INFO:autogen_core:Resolving response with message type TextMessage for recipient None from assistant: {'content': 'Here are the top 10 salaries based on the hourly rates from the CSV file:\n\n| First Name | Last Name | Job Title | Hourly Rate |\n|------------|-----------|------------------------------------|-------------|\n| Eric | Barden | Executive4 | 139.61 |\n| Idris | Beauregard| Executive3 | 115.90 |\n| Lori | Aagard | Pol Capt-Precinct | 112.70 |\n| Krista | Bair | Pol Capt-Precinct | 108.74 |\n| Amy | Bannister | Fire Chief, Dep Adm-80 Hrs | 104.07 |\n| Ginger | Armbruster| Executive2 | 102.42 |\n| William | Andersen | Executive2 | 102.42 |\n| Valarie | Anderson | Executive2 | 102.42 |\n| Paige | Alderete | Executive2 | 102.42 |\n| Kathryn | Aisenberg | Executive2 | 100.65 |\n\nIf you need any further details or analysis, let me know!', 'source': 'assistant'}
带有文件搜索的助理#
让我们尝试一下文档问答功能。 我们首先下载关于第三次英国-阿富汗战争的 Wikipedia 页面。
response = requests.get("https://en.wikipedia.org/wiki/Third_Anglo-Afghan_War")
with open("third_anglo_afghan_war.html", "wb") as file:
file.write(response.content)
使用 UploadForFileSearch
消息将文件发送到 Agent。
runtime.start()
await runtime.send_message(
UploadForFileSearch(file_path="third_anglo_afghan_war.html", vector_store_id=vector_store.id), agent
)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type UploadForFileSearch to assistant: {'file_path': 'third_anglo_afghan_war.html', 'vector_store_id': 'vs_h3xxPbJFnd1iZ9WdjsQwNdrp'}
INFO:autogen_core:Calling message handler for assistant:default with message type UploadForFileSearch sent by Unknown
INFO:autogen_core:Resolving response with message type NoneType for recipient None from assistant: None
让我们向 Agent 提出一些关于文档的问题。 在提问之前,我们重置 Agent 内存以开始新的对话。
runtime.start()
await runtime.send_message(Reset(), agent)
await runtime.send_message(
TextMessage(
content="When and where was the treaty of Rawalpindi signed? Answer using the document provided.", source="user"
),
agent,
)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type Reset to assistant: {}
INFO:autogen_core:Calling message handler for assistant:default with message type Reset sent by Unknown
INFO:autogen_core:Resolving response with message type NoneType for recipient None from assistant: None
INFO:autogen_core:Sending message of type TextMessage to assistant: {'content': 'When and where was the treaty of Rawalpindi signed? Answer using the document provided.', 'source': 'user'}
INFO:autogen_core:Calling message handler for assistant:default with message type TextMessage sent by Unknown
--------------------------------------------------------------------------------
Assistant:
The Treaty of Rawalpindi was signed on **8 August 1919**. The location of the signing was in **Rawalpindi**, which is in present-day Pakistan【6:0†source】.
INFO:autogen_core:Resolving response with message type TextMessage for recipient None from assistant: {'content': 'The Treaty of Rawalpindi was signed on **8 August 1919**. The location of the signing was in **Rawalpindi**, which is in present-day Pakistan【6:0†source】.', 'source': 'assistant'}
[0] third_anglo_afghan_war.html
就是这样! 我们已经成功构建了一个由 OpenAI Assistant 支持的 Agent。