在 Azure ML 管道中运行流#

作者:  头像 头像在 GitHub 上打开

为什么使用 Azure 机器学习 (ML) 管道在云端运行流?#

在实际场景中,流服务于各种目的。例如,考虑一个旨在评估人机交互会话相关性分数的流。假设您希望每晚触发此流以评估今天的性能,并避免 LLM(语言模型)终结点的高峰时段。在这种常见场景中,人们通常会遇到以下需求:

  • 处理大型数据输入:一次运行数千或数百万数据输入的流。

  • 可伸缩性和效率:需要一个可伸缩、高效且具有弹性的平台来确保成功。

  • 自动化:当上游数据就绪或以固定间隔自动触发批处理流。

Azure ML 管道有效地解决了所有这些离线需求。通过提示流和 Azure ML 管道的集成,流用户可以非常轻松地实现上述目标,在本教程中,您将学习:

  • 如何使用 Python SDK 自动将流转换为 Azure ML 管道中的“步骤”。

  • 如何将数据输入管道以触发批处理流运行。

  • 如何在提示流步骤之前或之后构建其他管道步骤。例如数据预处理或结果聚合。

  • 如何设置一个简单的管道调度程序。

  • 如何将管道部署到 Azure ML 批处理终结点。然后,我可以在需要时使用新数据调用它。

在开始之前,请考虑以下先决条件:

1. 连接到 Azure 机器学习工作区#

工作区是 Azure 机器学习的顶级资源,提供了一个集中位置来处理您在使用 Azure 机器学习时创建的所有工件。本节我们将连接到将运行作业的工作区。

1.1 导入所需的库#

# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient, load_component, Input, Output
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline

1.2 配置凭据#

我们使用 DefaultAzureCredential 来获取对工作区的访问权限。DefaultAzureCredential 应该能够处理大多数 Azure SDK 身份验证场景。

如果它对您不起作用,请参考更多可用的凭据:配置凭据示例azure-identity 参考文档

try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

1.3 获取工作区的句柄#

我们使用“配置文件”连接到您的工作区。请检查此笔记本以从 Azure ML 工作区门户获取您的配置文件并将其粘贴到此文件夹中。然后,如果您通过下一个代码块,则表示您已为环境做好所有准备。

# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

# Retrieve an already attached Azure Machine Learning Compute.
cluster_name = "cpu-cluster"
print(ml_client.compute.get(cluster_name))

2. 将流作为组件加载#

如果您已经使用 Promptflow SDK 或门户创作了一个流,您可以在流文件夹中找到 flow.dag.yaml 文件。此 YAML 规范对于将流加载到 Azure ML 组件中至关重要。

备注:要将load_component函数与 flow.dag.yaml 一起使用,请确保以下各项:

  • $schema应在目标 DAG yaml 文件中定义。例如:$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json

  • 必须通过验证文件“/.promptflow/flow.tools.json”来生成并保持流元数据最新。如果它不存在,运行以下命令来生成和更新它:pf flow validate --source <my-flow-directory>

flow_component = load_component("../../flows/standard/web-classification/flow.dag.yaml")

当使用load_component函数和流 YAML 规范时,您的流会自动转换为并行组件。此并行组件专为大规模、离线、并行化处理而设计,具有效率和弹性。以下是此自动转换组件的一些关键特性:

  • 预定义的输入和输出端口

端口名称

类型

描述

数据

uri_folder 或 uri_file

接受批处理数据输入到您的流中。如果您的数据是单个文件,您可以使用uri_file数据类型;如果您的文件夹包含多个具有相同架构的文件,则可以使用uri_folder数据类型。默认数据类型是 jsonl,但您可以在管道中声明此流组件的实例后自定义此设置。请注意,您的数据将转换为数据帧,因此请确保您的 CSV 或 TSV 数据包含标题行以进行正确映射。

flow_outputs

uri_file

生成一个名为 parallel_run_step.jsonl 的单个输出文件。此数据文件中的每一行对应一个 JSON 对象,表示流返回,以及一个名为 line_number 的附加列,指示其在原始文件中的位置。

debug_info

uri_folder

如果您在调试模式下运行流组件,此端口将为您的每一行运行提供调试信息。例如,步骤之间的中间输出,或 LLM 响应和令牌使用情况。

prompt flow base component image

注意

当您使用多个节点运行 pf 组件时,flow_outputsdebug_info输出需要将输出模式设置为mount模式。

  • 自动生成的参数

    这些参数表示所有流输入以及与流步骤关联的连接。您可以在流/运行定义中设置默认值,并且可以在作业提交期间进一步自定义它们。以 'web-classification' 示例流为例,此流只有一个名为 'url' 的输入和 2 个 LLM 步骤 'summarize_text_content' 和 'classify_with_llm'。此流组件的输入参数为:

    prompt flow base component image

  • 自动生成的环境

    创建的组件的环境将继承自最新的提示流运行时镜像。用户可以通过在flow.dag.yaml中指定environment属性,以及位于同一流文件夹下的 'requirements.txt' 文件,在环境中包含自定义包。

       ...
       environment:
          python_requirements_txt: requirements.txt
    

3. 构建管道#

3.1 声明输入和输出#

要为您的管道提供数据,您需要使用pathtypemode属性声明一个输入。请注意:mount是文件或文件夹数据输入的默认且建议模式。

声明管道输出是可选的。但是,如果您需要云中自定义的输出路径,您可以按照下面的示例设置数据存储上的路径。有关有效路径值的更多详细信息,请参阅此文档 - 管理管道输入输出

data_input = Input(
    path="../../flows/standard/web-classification/data.jsonl",
    type=AssetTypes.URI_FILE,
    mode="mount",
)

pipeline_output = Output(
    # Provide custom flow output file path if needed
    # path="azureml://datastores/<data_store_name>/paths/<path>",
    type=AssetTypes.URI_FOLDER,
    # rw_mount is suggested for flow output
    mode="rw_mount",
)

3.2.1 使用单个流组件运行管道#

由于所有 Promptflow 组件都基于 Azure ML 并行组件,因此用户可以利用特定的运行设置来控制流运行的并行化。以下是一些有用的设置:

运行设置

描述

允许值

默认值

PF_INPUT_FORMAT

当使用uri_folder作为输入数据时,此设置允许您指定哪些文件扩展名应被视为用于初始化流运行的数据文件。

json, jsonl, csv, tsv

jsonl

计算

定义将用于此作业的 Azure ML 工作区中的计算群集。

instance_count

定义将分配给此作业的计算群集中的节点数。

从 1 到计算群集的节点数。

1

max_concurrency_per_instance

定义在 1 个节点上并行运行流的专用处理器数量。当与 'instance_count' 设置结合使用时,您的流的总并行化将是 instance_count*max_concurrency_per_instance。

>1

1

mini_batch_size

定义每个 mini-batch 的行数。mini-batch是使用并行化处理完整数据的基本粒度。每个工作处理器一次处理一个 mini-batch,并且所有工作进程在不同节点上并行工作。

> 0

1

max_retries

定义如果任何 mini-batch 遇到内部异常时的重试次数。

备注:重试粒度基于 mini-batch。例如,使用前面的设置,您可以设置每个 mini-batch 100 行。当一行执行遇到瞬时问题或未处理的异常时,这 100 行将一起重试,即使其余 99 行成功。此外,在大多数情况下,状态码为 429 的 LLM 响应将在流运行中内部处理,并且不会触发 mini-batch 失败。

>= 0

3

error_threshold

定义可接受的失败行数。如果失败行数超过此阈值,作业将停止并标记为失败。设置为 '-1' 以禁用此失败检查。

-1 或 >=0

-1

mini_batch_error_threshold

定义所有重试后可容忍的最大失败 mini-batch 数。设置为 '-1' 以禁用此失败检查。

-1 或 >=0

-1

logging_level

确定并行作业如何将日志保存到磁盘。将流组件设置为 'DEBUG' 允许组件将中间流日志输出到 'debug_info' 端口。

信息、警告、调试

信息

超时

设置每个 mini-batch 执行的超时检查器(以毫秒为单位)。如果 mini-batch 运行时间超过此阈值,它将被标记为失败并触发下一次重试。请根据您的 mini-batch 大小和 LLM 终结点的总流量吞吐量考虑设置更高的值。

> 0

600

# Define the pipeline as a function
@pipeline()
def pipeline_func_with_flow(
    # Function inputs will be treated as pipeline input data or parameters.
    # Pipeline input could be linked to step inputs to pass data between steps.
    # Users are not required to define pipeline inputs.
    # With pipeline inputs, user can provide the different data or values when they trigger different pipeline runs.
    pipeline_input_data: Input,
    parallel_node_count: int = 1,
):
    # Declare pipeline step 'flow_node' by using flow component
    flow_node = flow_component(
        # Bind the pipeline intput data to the port 'data' of the flow component
        # If you don't have pipeline input, you can directly pass the 'data_input' object to the 'data'
        # But with this approach, you can't provide different data when you trigger different pipeline runs.
        # data=data_input,
        data=pipeline_input_data,
        # Declare which column of input data should be mapped to flow input
        # the value pattern follows ${data.<column_name_from_data_input>}
        url="${data.url}",
        # Provide the connection values of the flow component
        # The value of connection and deployment_name should align with your workspace connection settings.
        connections={
            "summarize_text_content": {
                "connection": "azure_open_ai_connection",
                "deployment_name": "gpt-35-turbo",
            },
            "classify_with_llm": {
                "connection": "azure_open_ai_connection",
                "deployment_name": "gpt-35-turbo",
            },
        },
    )

    # Provide run settings of your flow component
    # Only 'compute' is required and other setting will keep default value if not provided.
    flow_node.environment_variables = {
        "PF_INPUT_FORMAT": "jsonl",
    }
    flow_node.compute = "cpu-cluster"
    flow_node.resources = {"instance_count": parallel_node_count}
    flow_node.mini_batch_size = 5
    flow_node.max_concurrency_per_instance = 2
    flow_node.retry_settings = {
        "max_retries": 1,
        "timeout": 1200,
    }
    flow_node.error_threshold = -1
    flow_node.mini_batch_error_threshold = -1
    flow_node.logging_level = "DEBUG"

    # Function return will be treated as pipeline output. This is not required.
    return {"flow_result_folder": flow_node.outputs.flow_outputs}


# create pipeline instance
pipeline_job_def = pipeline_func_with_flow(pipeline_input_data=data_input)
pipeline_job_def.outputs.flow_result_folder = pipeline_output

将管道作业提交到您的工作区,然后通过输出中的链接在 UI 上检查作业状态。

# Submit the pipeline job to your workspace
pipeline_job_run = ml_client.jobs.create_or_update(
    pipeline_job_def, experiment_name="Single_flow_component_pipeline_job"
)
pipeline_job_run

ml_client.jobs.stream(pipeline_job_run.name)

注意

  • mini_batch_size的选择会显著影响流作业的效率。由于每个 mini-batch 中的行按顺序运行,为该参数设置更高的值会增加块大小,从而降低并行化。另一方面,较大的批处理大小也会增加重试成本,因为重试是基于整个 mini-batch。相反,选择最低值(例如 mini_batch_size=1)可能会引入额外的开销,影响编排或结果汇总期间多个 mini-batch 的效率。因此,建议从 10 到 100 之间的值开始,然后根据您的具体要求进行微调。

  • max_concurrency_per_instance设置可以显著提高单个计算节点内的并行效率。但是,它也引入了几个潜在问题:1) 增加内存不足的风险,2) LLM 终结点在同时接收过多请求时可能会遇到限制。通常,建议将 max_concurrency_per_instance 数设置为等于计算核心数,以在并行性和资源限制之间取得平衡。

3.2.2 运行具有多个组件的复杂管道#

在典型的管道中,您会发现包含所有离线业务需求的多个步骤。如果您打算构建一个更复杂的生产管道,请探索以下资源:

此外,请考虑以下示例代码,该代码从存储库加载两个额外的命令组件以构建单个离线管道:

  • data_prep_component:此虚拟数据预处理步骤执行简单的数据采样。

  • result_parser_component:结合源数据、流结果和调试输出,它生成一个包含原始查询、LLM 预测和 LLM 令牌使用情况的单个文件。

# load Azure ML components
data_prep_component = load_component("./components/data-prep/data-prep.yaml")
result_parser_component = load_component(
    "./components/result-parser/result-parser.yaml"
)

# load flow as component
flow_component = load_component("../../flows/standard/web-classification/flow.dag.yaml")


@pipeline()
def pipeline_func_with_flow(pipeline_input_data):
    data_prep_node = data_prep_component(
        input_data_file=pipeline_input_data,
    )
    data_prep_node.compute = "cpu-cluster"

    flow_node = flow_component(
        # Feed the output of data_prep_node to the flow component
        data=data_prep_node.outputs.output_data_folder,
        url="${data.url}",
        connections={
            "summarize_text_content": {
                "connection": "azure_open_ai_connection",
                "deployment_name": "gpt-35-turbo",
            },
            "classify_with_llm": {
                "connection": "azure_open_ai_connection",
                "deployment_name": "gpt-35-turbo",
            },
        },
    )

    flow_node.environment_variables = {"PF_INPUT_FORMAT": "csv"}
    flow_node.compute = "cpu-cluster"
    flow_node.mini_batch_size = 5
    flow_node.max_concurrency_per_instance = 2
    flow_node.resources = {"instance_count": 1}
    flow_node.logging_level = "DEBUG"

    # set output mode to 'mount'
    # This is required for the flow component when the 'instance_count' is set higher than 1
    flow_node.outputs.flow_outputs.mode = "mount"
    flow_node.outputs.debug_info.mode = "mount"

    result_parser_node = result_parser_component(
        source_data=data_prep_node.outputs.output_data_folder,
        pf_output_data=flow_node.outputs.flow_outputs,
        pf_debug_data=flow_node.outputs.debug_info,
    )

    flow_node.retry_settings = {
        "max_retries": 1,
        "timeout": 6000,
    }

    result_parser_node.compute = "cpu-cluster"

    return {"flow_result_folder": result_parser_node.outputs.merged_data}


# create pipeline instance
pipeline_job_def = pipeline_func_with_flow(pipeline_input_data=data_input)
pipeline_job_def.outputs.flow_result_folder = pipeline_output

将管道作业提交到您的工作区,然后通过输出中的链接在 UI 上检查作业状态。

# submit job to workspace
pipeline_job_run = ml_client.jobs.create_or_update(
    pipeline_job_def, experiment_name="Complex_flow_component_pipeline_job"
)
pipeline_job_run

ml_client.jobs.stream(pipeline_job_run.name)

4 后续步骤#

4.1 后续步骤 - 为管道设置调度程序#

Azure Machine Learning 管道支持本机调度程序,帮助用户通过预定义的时间触发器定期运行其管道作业。以下是使用流组件在新创建的管道上设置调度程序的代码示例。

让我们首先声明一个具有自定义重复模式的调度程序。

from datetime import datetime
from azure.ai.ml.entities import JobSchedule, RecurrenceTrigger, RecurrencePattern
from azure.ai.ml.constants import TimeZone

schedule_name = "simple_sdk_create_schedule_recurrence"
schedule_start_time = datetime.utcnow()

recurrence_trigger = RecurrenceTrigger(
    frequency="day",  # could accept "hour", "minute", "day", "week", "month"
    interval=1,
    schedule=RecurrencePattern(hours=10, minutes=[0, 1]),
    start_time=schedule_start_time,
    time_zone=TimeZone.UTC,
)

job_schedule = JobSchedule(
    name=schedule_name,
    trigger=recurrence_trigger,
    # Declare the pipeline job to be scheduled. Here we uses the pipeline job created in previous example.
    create_job=pipeline_job_def,
)

要启动调度程序,请按照此示例操作:

job_schedule = ml_client.schedules.begin_create_or_update(
    schedule=job_schedule
).result()
print(job_schedule)

要查看所有已调度作业,请导航到 Azure Machine Learning 工作区 UI 中的作业列表页面。由调度程序触发的任何作业都将具有以下格式的显示名称:<schedule_name>-<trigger_time>。例如,如果您有一个名为“named-schedule”的调度程序,则在 2021 年 1 月 1 日 06:00:00 UTC 触发的作业将具有显示名称“named-schedule-20210101T060000Z”。

要禁用或关闭正在运行的调度程序,请按照此示例操作:

job_schedule = ml_client.schedules.begin_disable(name=schedule_name).result()
job_schedule.is_enabled

要探索有关调度 Azure Machine Learning 管道作业的更多详细信息,请访问此文章:如何调度管道作业

4.2 后续步骤 - 将管道部署到终结点#

Azure Machine Learning 还提供批处理终结点,使您能够将管道部署到终结点以实现高效的操作。如果您需要使用外部业务流程协调程序(例如 Azure Data Factory 或 Microsoft Fabric)调度流管道,那么使用批处理终结点是您流管道的最佳推荐。

让我们首先在您的工作区中创建一个新的批处理终结点。

from azure.ai.ml.entities import BatchEndpoint, PipelineComponentBatchDeployment

# from azure.ai.ml.entities import ModelBatchDeployment, ModelBatchDeploymentSettings, Model, AmlCompute, Data, BatchRetrySettings, CodeConfiguration, Environment, Data
# from azure.ai.ml.constants import BatchDeploymentOutputAction


endpoint_name = "hello-my-pipeline-endpoint"
endpoint = BatchEndpoint(
    name=endpoint_name,
    description="A hello world endpoint for pipeline",
)

ml_client.batch_endpoints.begin_create_or_update(endpoint).result()

每个终结点可以支持多个部署,每个部署都与不同的管道相关联。在此上下文中,我们使用流管道作业启动新的部署,目标是最近建立的终结点。

deployment = PipelineComponentBatchDeployment(
    name="my-pipeline-deployment",
    description="A hello world deployment with a pipeline job.",
    endpoint_name=endpoint.name,
    # Make sure 'pipeline_job_run' run successfully before deploying the endpoint
    job_definition=pipeline_job_run,
    settings={"default_compute": "cpu-cluster", "continue_on_step_failure": False},
)

ml_client.batch_deployments.begin_create_or_update(deployment).result()

# Refresh the default deployment to the latest one at our endpoint.
endpoint = ml_client.batch_endpoints.get(endpoint.name)
endpoint.defaults.deployment_name = deployment.name
ml_client.batch_endpoints.begin_create_or_update(endpoint).result()

使用适当的数据调用目标终结点的默认部署

batch_endpoint_job = ml_client.batch_endpoints.invoke(
    endpoint_name=endpoint.name,
    inputs={"pipeline_input_data": data_input},
)

最后,使用以下链接在工作区 UI 上验证调用:

ml_client.jobs.get(batch_endpoint_job.name)

要探索有关 Azure Machine Learning 批处理终结点的更多详细信息,请访问此文章:如何使用批处理管道部署