在 Azure ML 管道作业中使用流#

在实际场景中,流具有多种功能。例如,考虑一个专门设计用于评估人类与代理之间通信会话相关性分数的离线流。此流每晚触发一次,并处理大量的会话数据。在这种情况下,并行组件和 AzureML 管道是处理大规模、高弹性、高效离线批处理需求的最佳选择。

开发并彻底测试流后,本指南将引导您将流用作 AzureML 管道作业中的并行组件。

先决条件

若要启用此功能,客户需要:

  1. 安装相关 CLI 或包

    1. 对于 CLI,请先安装 Azure CLI,然后通过az extension add -n ml安装扩展ml>=2.22.0

    2. 对于 SDK,请通过pip install azure-ai-ml>=1.12.0pip install promptflow[azure]安装包azure-ai-ml>=1.12.0

  2. 确保目标源中存在$schema

    1. flow.dag.yaml: $schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json

    2. run.yaml: $schema: https://azuremlschemas.azureedge.net/promptflow/latest/Run.schema.json

  3. 确保元数据已生成并为最新

    1. <my-flow-directory>/.promptflow/flow.tools.json应存在;

    2. 客户可以通过pf flow validate --source <my-flow-directory>更新文件。

要探索在 Azure ML 工作区中运行示例流的可执行端到端示例,您可以参考此教程笔记本:通过管道运行流

有关 AzureML 和组件的更多信息

将流注册为组件#

假设已有一个流,其flow.dag.yaml如下所示

$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
environment:
  python_requirements_txt: requirements.txt
inputs:
  text:
    type: string
    default: Hello World!
outputs:
  output:
    type: string
    reference: ${llm.output}
nodes:
- name: hello_prompt
  type: prompt
  source:
    type: code
    path: hello.jinja2
  inputs:
    text: ${inputs.text}
- name: llm
  type: python
  source:
    type: code
    path: hello.py
  inputs:
    prompt: ${hello_prompt.output}
    deployment_name: text-davinci-003
    max_tokens: "120"

客户可以使用 CLI 或 SDK 将流注册为组件

# Register flow as a component
az ml component create --file \<my-flow-directory\>/flow.dag.yaml

# Register flow as a component and specify its name and version
# Default component name will be the name of flow folder, which can be invalid as a component name; default version will be "1"
az ml component create --file \<my-flow-directory\>/flow.dag.yaml --version 3 --set name=basic_updated
from azure.ai.ml import MLClient, load_component

ml_client = MLClient()

# Register flow as a component
flow_component = load_component("<my-flow-directory>/flow.dag.yaml")
ml_client.components.create_or_update(flow_component)

# Register flow as a component and specify its name and version
# Default component name will be the name of flow folder, which can be invalid as a component name; default version will be "1"
flow_component.name = "basic_updated"
ml_client.components.create_or_update(flow_component, version="3")

生成的组件将是一个并行组件,其定义如下所示

name: basic
version: 1
display_name: basic
is_deterministic: True
type: parallel
inputs:
  data:
    type: uri_folder
    optional: False
  run_outputs:
    type: uri_folder
    optional: True
  text:
    type: string
    optional: False
    default: Hello World!
outputs:
  flow_outputs:
    type: uri_folder
  debug_info:
    type: uri_folder
...

除了固定的输入/输出端口,所有连接和流输入都将作为组件的输入参数公开。默认值可在流/运行定义中提供;它们也可以在作业提交时设置/覆盖。端口的完整描述可在组件端口和运行设置部分中查看。

在管道作业中使用流#

将流注册为组件后,它们可以在管道作业中像常规注册组件一样被引用。客户也可以直接在管道作业中使用流,然后将在作业提交时创建匿名组件。

...
inputs:
  basic_input:
    type: uri_file
    path: <path-to-data>
compute: azureml:cpu-cluster
jobs:
  flow_from_registered:
    type: parallel
    component: azureml:my_flow_component:1
    inputs:
      data: ${{parent.inputs.basic_input}}
      text: "${data.text}"
  flow_from_dag:
    type: parallel
    component: <path-to-flow-dag-yaml>
    inputs:
      data: ${{parent.inputs.basic_input}}
      text: "${data.text}"
  flow_from_run:
    type: parallel
    component: <path-to-run-yaml>
    inputs:
      data: ${{parent.inputs.basic_input}}
      text: "${data.text}"
...

管道作业可以通过az ml job create --file pipeline.yml提交。

完整的示例可以在此处找到。

from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient, load_component, Input
from azure.ai.ml.dsl import pipeline

credential = DefaultAzureCredential()
ml_client = MLClient.from_config(credential=credential)
data_input = Input(path="<path-to-data>", type='uri_file')

flow_component_from_registered = ml_client.components.get("my_flow_component", "1")
flow_component_from_dag = load_component("<path-to-flow-dag-yaml>")
flow_component_from_run = load_component("<path-to-run-yaml>")

@pipeline
def pipeline_func_with_flow(basic_input):
    flow_from_registered = flow_component_from_registered(
        data=data,
        text="${data.text}",
    )
    flow_from_dag = flow_component_from_dag(
        data=data,
        text="${data.text}",
    )
    flow_from_run = flow_component_from_run(
        data=data,
        text="${data.text}",
    )

pipeline_with_flow = pipeline_func_with_flow(basic_input=data_input)
pipeline_with_flow.compute = "cpu-cluster"

pipeline_job = ml_client.jobs.create_or_update(pipeline_with_flow)
ml_client.jobs.stream(pipeline_job.name)

完整的示例可以在此处找到。

与常规并行组件一样,客户可以在管道作业中为它们指定运行设置。一些常用运行设置已在组件端口和运行设置部分中列出;客户还可以参考并行组件的官方文档了解更多详细信息

...
jobs:
  flow_node:
    type: parallel
    component: <path-to-complicated-run-yaml>
    compute: azureml:cpu-cluster
    instance_count: 2
    max_concurrency_per_instance: 2
    mini_batch_error_threshold: 5
    retry_settings:
      max_retries: 3
      timeout: 30
    inputs:
      data: ${{parent.inputs.data}}
      url: "${data.url}"
      connections.summarize_text_content.connection: azure_open_ai_connection
      connections.summarize_text_content.deployment_name: text-davinci-003
      environment_variables.AZURE_OPENAI_API_KEY: ${my_connection.api_key}
      environment_variables.AZURE_OPENAI_API_BASE: ${my_connection.api_base}
...
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient, load_component, Input
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import RetrySettings

credential = DefaultAzureCredential()
ml_client = MLClient.from_config(credential=credential)
data_input = Input(path="<path-to-data>", type='uri_file')

# Load flow as a component
flow_component = load_component("<path-to-complicated-run-yaml>")

@pipeline
def pipeline_func_with_flow(data):
    flow_node = flow_component(
        data=data,
        url="${data.url}",
        connections={
            "summarize_text_content": {
                "connection": "azure_open_ai_connection",
                "deployment_name": "text-davinci-003",
            },
        },
        environment_variables={
            "AZURE_OPENAI_API_KEY": "${my_connection.api_key}",
            "AZURE_OPENAI_API_BASE": "${my_connection.api_base}",
        }
    )
    flow_node.compute = "cpu-cluster"
    flow_node.instance_count = 2
    flow_node.max_concurrency_per_instance = 2
    flow_node.mini_batch_error_threshold = 5
    flow_node.retry_settings = RetrySettings(timeout=30, max_retries=5)

pipeline_with_flow = pipeline_func_with_flow(data=data_input)

pipeline_job = ml_client.jobs.create_or_update(pipeline_with_flow)
ml_client.jobs.stream(pipeline_job.name)

组件的环境#

默认情况下,创建的组件的环境将基于最新的 promptflow 运行时映像。如果客户在flow.dag.yaml指定了 python 需求文件,它们将自动应用于环境

...
environment:
  python_requirements_txt: requirements.txt

如果客户希望使用现有的 Azure ML 环境或以 Azure ML 样式定义环境,他们可以在run.yaml中定义,如下所示

$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Run.schema.json
flow: <my-flow-directory>
azureml:
  environment: azureml:my-environment:1

有关 Azure ML 环境支持格式的更多详细信息,请参阅此文档

Prompt flow 中的流与管道作业中的流之间的差异#

在 prompt flow 中,流在为 prompt flow 设计的计算会话上运行;而在管道作业中,流在不同类型的计算(通常是计算集群)上运行。

鉴于上述情况,如果您的流具有依赖于身份或环境变量的逻辑,请注意此差异,因为当流在管道作业中运行时,您可能会遇到一些意外错误,并且您可能需要一些额外的配置才能使其正常工作。

组件端口和运行设置#

输入端口#

类型

描述

数据

固定

uri_folder 或 uri_file

必需;用于传入输入数据。支持的格式包括mltable和 jsonl 文件列表。

run_outputs

固定

uri_folder

可选;用于传入标准流的输出,用于评估流。应链接到管道中先前流节点的flow_outputs

输出端口#

类型

描述

flow_outputs

固定

uri_folder

一个包含 1 个或多个 jsonl 文件(包含流运行的输出)的 uri_folder

debug_info

固定

uri_folder

一个包含流运行调试信息(例如运行日志)的 uri_folder

参数#

类型

描述

来自流输入

字符串

默认值将从流输入继承;用于覆盖流输入的列映射

connections..connection

来自内置 LLM 工具的节点

字符串

默认值将是flow.dag.yamlrun.yaml中定义的当前值;用于覆盖相应节点使用的连接。连接应存在于当前工作区中。

connections..deployment_name

来自内置 LLM 工具的节点

字符串

默认值将是flow.dag.yamlrun.yaml中定义的当前值;用于覆盖相应节点的目标部署名称。部署应可通过提供的连接获得。

connections..

来自具有Connection输入的节点

字符串

默认值将是flow.dag.yamlrun.yaml中定义的当前值;用于覆盖相应节点使用的连接。连接应存在于当前工作区中。

environment_variables.

来自run.yaml中定义的环境变量

字符串

默认值将是run.yaml中定义的当前值;用于在流运行期间覆盖环境变量,例如 AZURE_OPENAI_API_KEY。请注意,您可以使用{my_connection.api_key}等表达式引用工作区连接。

运行设置#

类型

描述

instance_count

整数

用于作业的节点数。默认值为 1。

max_concurrency_per_instance

整数

每个节点上的处理器数量。

mini_batch_error_threshold

整数

定义此并行作业中可以忽略的失败小批次的数量。如果失败小批次的计数高于此阈值,则并行作业将被标记为失败。

小批次被标记为失败,如果
- run() 的返回值计数小于小批次输入计数。
- 在自定义 run() 代码中捕获异常。

“-1”是默认值,这意味着在并行作业期间忽略所有失败的小批次。

retry_settings.max_retries

整数

定义当小批次失败或超时时重试的次数。如果所有重试都失败,小批次将被标记为失败,并由mini_batch_error_threshold计算。

retry_settings.timeout

整数

定义执行自定义 run() 函数的超时时间(秒)。如果执行时间高于此阈值,小批次将被中止,并标记为失败小批次以触发重试。

logging_level

INFO、WARNING 或 DEBUG

定义哪些级别的日志将转储到用户日志文件中。

有关更多运行设置,请查看并行组件的官方文档