ACA 动态会话代码执行器#

本指南将解释 Azure 容器应用中的动态会话,并展示如何使用 Azure 容器代码执行器类。

Azure 容器应用动态会话是 Azure 容器应用服务中的一个组件。该环境托管在远程 Azure 实例上,不会在本地执行任何代码。该解释器能够在 Jupyter 环境中执行 Python 代码,并预安装了常用的软件包基础。用户可以为其应用程序创建自定义环境。此外,文件可以上传到或下载自每个会话。

代码解释器可以运行多个代码会话,每个会话都由一个会话标识符字符串分隔。

创建容器应用会话池#

在您的 Azure 门户中,创建一个新的 容器 应用 会话 资源,并将池类型设置为 Python 代码 解释器,并记下 管理 终结点。 终结点的格式应类似于 https://{region}.dynamicsessions.io/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/sessionPools/{session_pool_name}

或者,您可以使用 Azure CLI 创建会话池。

ACADynamicSessionsCodeExecutor#

ACADynamicSessionsCodeExecutor 类是一个 Python 代码执行器,它在默认的无服务器代码解释器会话上创建并执行任意 Python 代码。 它的接口如下:

初始化#

首先,您需要找到或创建一个实现 TokenProvider 接口的凭据对象。 这是任何实现以下功能的对象

def get_token(
    self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, **kwargs: Any
) -> azure.core.credentials.AccessToken

此类对象的一个例子是 azure.identity.DefaultAzureCredential 类。

让我们从安装它开始

# pip install azure.identity

接下来,让我们为我们的代码导入所有必要的模块和类

import os
import tempfile

from anyio import open_file
from autogen_core import CancellationToken
from autogen_core.code_executor import CodeBlock
from autogen_ext.code_executors.azure import ACADynamicSessionsCodeExecutor
from azure.identity import DefaultAzureCredential

现在,我们创建我们的 Azure 代码执行器并运行一些测试代码,并验证它是否正确运行。我们将使用一个临时工作目录创建执行器,以确保一个干净的环境,因为我们将展示如何使用每个功能

cancellation_token = CancellationToken()
POOL_MANAGEMENT_ENDPOINT = "..."

with tempfile.TemporaryDirectory() as temp_dir:
    executor = ACADynamicSessionsCodeExecutor(
        pool_management_endpoint=POOL_MANAGEMENT_ENDPOINT, credential=DefaultAzureCredential(), work_dir=temp_dir
    )

    code_blocks = [CodeBlock(code="import sys; print('hello world!')", language="python")]
    code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
    assert code_result.exit_code == 0 and "hello world!" in code_result.output

接下来,让我们尝试上传一些文件并验证它们的完整性。所有上传到无服务器代码解释器的文件都上传到 /mnt/data 目录中。所有可下载的文件也必须放置在该目录中。默认情况下,代码执行器的当前工作目录设置为 /mnt/data

with tempfile.TemporaryDirectory() as temp_dir:
    test_file_1 = "test_upload_1.txt"
    test_file_1_contents = "test1 contents"
    test_file_2 = "test_upload_2.txt"
    test_file_2_contents = "test2 contents"

    async with await open_file(os.path.join(temp_dir, test_file_1), "w") as f:  # type: ignore[syntax]
        await f.write(test_file_1_contents)
    async with await open_file(os.path.join(temp_dir, test_file_2), "w") as f:  # type: ignore[syntax]
        await f.write(test_file_2_contents)

    assert os.path.isfile(os.path.join(temp_dir, test_file_1))
    assert os.path.isfile(os.path.join(temp_dir, test_file_2))

    executor = ACADynamicSessionsCodeExecutor(
        pool_management_endpoint=POOL_MANAGEMENT_ENDPOINT, credential=DefaultAzureCredential(), work_dir=temp_dir
    )
    await executor.upload_files([test_file_1, test_file_2], cancellation_token)

    file_list = await executor.get_file_list(cancellation_token)
    assert test_file_1 in file_list
    assert test_file_2 in file_list

    code_blocks = [
        CodeBlock(
            code=f"""
with open("{test_file_1}") as f:
  print(f.read())
with open("{test_file_2}") as f:
  print(f.read())
""",
            language="python",
        )
    ]
    code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
    assert code_result.exit_code == 0
    assert test_file_1_contents in code_result.output
    assert test_file_2_contents in code_result.output

下载文件的工作方式类似。

with tempfile.TemporaryDirectory() as temp_dir:
    test_file_1 = "test_upload_1.txt"
    test_file_1_contents = "test1 contents"
    test_file_2 = "test_upload_2.txt"
    test_file_2_contents = "test2 contents"

    assert not os.path.isfile(os.path.join(temp_dir, test_file_1))
    assert not os.path.isfile(os.path.join(temp_dir, test_file_2))

    executor = ACADynamicSessionsCodeExecutor(
        pool_management_endpoint=POOL_MANAGEMENT_ENDPOINT, credential=DefaultAzureCredential(), work_dir=temp_dir
    )

    code_blocks = [
        CodeBlock(
            code=f"""
with open("{test_file_1}", "w") as f:
  f.write("{test_file_1_contents}")
with open("{test_file_2}", "w") as f:
  f.write("{test_file_2_contents}")
""",
            language="python",
        ),
    ]
    code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
    assert code_result.exit_code == 0

    file_list = await executor.get_file_list(cancellation_token)
    assert test_file_1 in file_list
    assert test_file_2 in file_list

    await executor.download_files([test_file_1, test_file_2], cancellation_token)

    assert os.path.isfile(os.path.join(temp_dir, test_file_1))
    async with await open_file(os.path.join(temp_dir, test_file_1), "r") as f:  # type: ignore[syntax]
        content = await f.read()
        assert test_file_1_contents in content
    assert os.path.isfile(os.path.join(temp_dir, test_file_2))
    async with await open_file(os.path.join(temp_dir, test_file_2), "r") as f:  # type: ignore[syntax]
        content = await f.read()
        assert test_file_2_contents in content

新会话#

ACADynamicSessionsCodeExecutor 类的每个实例都将具有一个唯一的会话 ID。 对特定代码执行器的每次调用都将在同一会话上执行,直到在其上调用 restart() 函数为止。 以前的会话无法重用。

在这里,我们将在代码会话上运行一些代码,重新启动它,然后验证是否已打开一个新会话。

executor = ACADynamicSessionsCodeExecutor(
    pool_management_endpoint=POOL_MANAGEMENT_ENDPOINT, credential=DefaultAzureCredential()
)

code_blocks = [CodeBlock(code="x = 'abcdefg'", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code == 0

code_blocks = [CodeBlock(code="print(x)", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code == 0 and "abcdefg" in code_result.output

await executor.restart()
code_blocks = [CodeBlock(code="print(x)", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)
assert code_result.exit_code != 0 and "NameError" in code_result.output

可用软件包#

每个代码执行实例都预安装了大多数常用的软件包。 但是,可用软件包和版本的列表在执行环境之外不可用。 可以通过调用代码执行器上的 get_available_packages() 函数来检索环境中的软件包列表。

print(executor.get_available_packages(cancellation_token))