ACA 动态会话代码执行器#
本指南将解释 Azure 容器应用中的动态会话,并展示如何使用 Azure 容器代码执行器类。
Azure 容器应用动态会话是 Azure 容器应用服务中的一个组件。该环境托管在远程 Azure 实例上,不会在本地执行任何代码。该解释器能够在 Jupyter 环境中执行 Python 代码,并预安装了常用的软件包基础。用户可以为其应用程序创建自定义环境。此外,文件可以上传到或下载自每个会话。
代码解释器可以运行多个代码会话,每个会话都由一个会话标识符字符串分隔。
创建容器应用会话池#
在您的 Azure 门户中,创建一个新的 容器 应用 会话 池
资源,并将池类型设置为 Python 代码 解释器
,并记下 池 管理 终结点
。 终结点的格式应类似于 https://{region}.dynamicsessions.io/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/sessionPools/{session_pool_name}
。
或者,您可以使用 Azure CLI 创建会话池。
ACADynamicSessionsCodeExecutor#
ACADynamicSessionsCodeExecutor
类是一个 Python 代码执行器,它在默认的无服务器代码解释器会话上创建并执行任意 Python 代码。 它的接口如下:
初始化#
首先,您需要找到或创建一个实现 TokenProvider
接口的凭据对象。 这是任何实现以下功能的对象
def get_token(
self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, **kwargs: Any
) -> azure.core.credentials.AccessToken
此类对象的一个例子是 azure.identity.DefaultAzureCredential 类。
让我们从安装它开始
# pip install azure.identity
接下来,让我们为我们的代码导入所有必要的模块和类
import os
import tempfile
from anyio import open_file
from autogen_core import CancellationToken
from autogen_core.code_executor import CodeBlock
from autogen_ext.code_executors.azure import ACADynamicSessionsCodeExecutor
from azure.identity import DefaultAzureCredential
现在,我们创建我们的 Azure 代码执行器并运行一些测试代码,并验证它是否正确运行。我们将使用一个临时工作目录创建执行器,以确保一个干净的环境,因为我们将展示如何使用每个功能
cancellation_token = CancellationToken()
POOL_MANAGEMENT_ENDPOINT = "..."
with tempfile.TemporaryDirectory() as temp_dir:
executor = ACADynamicSessionsCodeExecutor(
pool_management_endpoint=POOL_MANAGEMENT_ENDPOINT, credential=DefaultAzureCredential(), work_dir=temp_dir
)
code_blocks = [CodeBlock(code="import sys; print('hello world!')", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code == 0 and "hello world!" in code_result.output
接下来,让我们尝试上传一些文件并验证它们的完整性。所有上传到无服务器代码解释器的文件都上传到 /mnt/data
目录中。所有可下载的文件也必须放置在该目录中。默认情况下,代码执行器的当前工作目录设置为 /mnt/data
。
with tempfile.TemporaryDirectory() as temp_dir:
test_file_1 = "test_upload_1.txt"
test_file_1_contents = "test1 contents"
test_file_2 = "test_upload_2.txt"
test_file_2_contents = "test2 contents"
async with await open_file(os.path.join(temp_dir, test_file_1), "w") as f: # type: ignore[syntax]
await f.write(test_file_1_contents)
async with await open_file(os.path.join(temp_dir, test_file_2), "w") as f: # type: ignore[syntax]
await f.write(test_file_2_contents)
assert os.path.isfile(os.path.join(temp_dir, test_file_1))
assert os.path.isfile(os.path.join(temp_dir, test_file_2))
executor = ACADynamicSessionsCodeExecutor(
pool_management_endpoint=POOL_MANAGEMENT_ENDPOINT, credential=DefaultAzureCredential(), work_dir=temp_dir
)
await executor.upload_files([test_file_1, test_file_2], cancellation_token)
file_list = await executor.get_file_list(cancellation_token)
assert test_file_1 in file_list
assert test_file_2 in file_list
code_blocks = [
CodeBlock(
code=f"""
with open("{test_file_1}") as f:
print(f.read())
with open("{test_file_2}") as f:
print(f.read())
""",
language="python",
)
]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code == 0
assert test_file_1_contents in code_result.output
assert test_file_2_contents in code_result.output
下载文件的工作方式类似。
with tempfile.TemporaryDirectory() as temp_dir:
test_file_1 = "test_upload_1.txt"
test_file_1_contents = "test1 contents"
test_file_2 = "test_upload_2.txt"
test_file_2_contents = "test2 contents"
assert not os.path.isfile(os.path.join(temp_dir, test_file_1))
assert not os.path.isfile(os.path.join(temp_dir, test_file_2))
executor = ACADynamicSessionsCodeExecutor(
pool_management_endpoint=POOL_MANAGEMENT_ENDPOINT, credential=DefaultAzureCredential(), work_dir=temp_dir
)
code_blocks = [
CodeBlock(
code=f"""
with open("{test_file_1}", "w") as f:
f.write("{test_file_1_contents}")
with open("{test_file_2}", "w") as f:
f.write("{test_file_2_contents}")
""",
language="python",
),
]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code == 0
file_list = await executor.get_file_list(cancellation_token)
assert test_file_1 in file_list
assert test_file_2 in file_list
await executor.download_files([test_file_1, test_file_2], cancellation_token)
assert os.path.isfile(os.path.join(temp_dir, test_file_1))
async with await open_file(os.path.join(temp_dir, test_file_1), "r") as f: # type: ignore[syntax]
content = await f.read()
assert test_file_1_contents in content
assert os.path.isfile(os.path.join(temp_dir, test_file_2))
async with await open_file(os.path.join(temp_dir, test_file_2), "r") as f: # type: ignore[syntax]
content = await f.read()
assert test_file_2_contents in content
新会话#
ACADynamicSessionsCodeExecutor
类的每个实例都将具有一个唯一的会话 ID。 对特定代码执行器的每次调用都将在同一会话上执行,直到在其上调用 restart()
函数为止。 以前的会话无法重用。
在这里,我们将在代码会话上运行一些代码,重新启动它,然后验证是否已打开一个新会话。
executor = ACADynamicSessionsCodeExecutor(
pool_management_endpoint=POOL_MANAGEMENT_ENDPOINT, credential=DefaultAzureCredential()
)
code_blocks = [CodeBlock(code="x = 'abcdefg'", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code == 0
code_blocks = [CodeBlock(code="print(x)", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code == 0 and "abcdefg" in code_result.output
await executor.restart()
code_blocks = [CodeBlock(code="print(x)", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code != 0 and "NameError" in code_result.output
可用软件包#
每个代码执行实例都预安装了大多数常用的软件包。 但是,可用软件包和版本的列表在执行环境之外不可用。 可以通过调用代码执行器上的 get_available_packages()
函数来检索环境中的软件包列表。
print(executor.get_available_packages(cancellation_token))