在 Azure ML 管道作业中使用流#
在实际场景中,流具有多种功能。例如,考虑一个专门设计用于评估人类与代理之间通信会话相关性分数的离线流。此流每晚触发一次,并处理大量的会话数据。在这种情况下,并行组件和 AzureML 管道是处理大规模、高弹性、高效离线批处理需求的最佳选择。
开发并彻底测试流后,本指南将引导您将流用作 AzureML 管道作业中的并行组件。
先决条件
若要启用此功能,客户需要:
安装相关 CLI 或包
对于 CLI,请先安装 Azure CLI,然后通过
az extension add -n ml
安装扩展ml>=2.22.0
;对于 SDK,请通过
pip install azure-ai-ml>=1.12.0
或pip install promptflow[azure]
安装包azure-ai-ml>=1.12.0
;
确保目标源中存在
$schema
flow.dag.yaml
:$schema
:https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
run.yaml
:$schema
:https://azuremlschemas.azureedge.net/promptflow/latest/Run.schema.json
确保元数据已生成并为最新
<my-flow-directory>/.promptflow/flow.tools.json
应存在;客户可以通过
pf flow validate --source <my-flow-directory>
更新文件。
要探索在 Azure ML 工作区中运行示例流的可执行端到端示例,您可以参考此教程笔记本:通过管道运行流
有关 AzureML 和组件的更多信息
将流注册为组件#
假设已有一个流,其flow.dag.yaml
如下所示
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
environment:
python_requirements_txt: requirements.txt
inputs:
text:
type: string
default: Hello World!
outputs:
output:
type: string
reference: ${llm.output}
nodes:
- name: hello_prompt
type: prompt
source:
type: code
path: hello.jinja2
inputs:
text: ${inputs.text}
- name: llm
type: python
source:
type: code
path: hello.py
inputs:
prompt: ${hello_prompt.output}
deployment_name: text-davinci-003
max_tokens: "120"
客户可以使用 CLI 或 SDK 将流注册为组件
# Register flow as a component
az ml component create --file \<my-flow-directory\>/flow.dag.yaml
# Register flow as a component and specify its name and version
# Default component name will be the name of flow folder, which can be invalid as a component name; default version will be "1"
az ml component create --file \<my-flow-directory\>/flow.dag.yaml --version 3 --set name=basic_updated
from azure.ai.ml import MLClient, load_component
ml_client = MLClient()
# Register flow as a component
flow_component = load_component("<my-flow-directory>/flow.dag.yaml")
ml_client.components.create_or_update(flow_component)
# Register flow as a component and specify its name and version
# Default component name will be the name of flow folder, which can be invalid as a component name; default version will be "1"
flow_component.name = "basic_updated"
ml_client.components.create_or_update(flow_component, version="3")
生成的组件将是一个并行组件,其定义如下所示
name: basic
version: 1
display_name: basic
is_deterministic: True
type: parallel
inputs:
data:
type: uri_folder
optional: False
run_outputs:
type: uri_folder
optional: True
text:
type: string
optional: False
default: Hello World!
outputs:
flow_outputs:
type: uri_folder
debug_info:
type: uri_folder
...
除了固定的输入/输出端口,所有连接和流输入都将作为组件的输入参数公开。默认值可在流/运行定义中提供;它们也可以在作业提交时设置/覆盖。端口的完整描述可在组件端口和运行设置部分中查看。
在管道作业中使用流#
将流注册为组件后,它们可以在管道作业中像常规注册组件一样被引用。客户也可以直接在管道作业中使用流,然后将在作业提交时创建匿名组件。
...
inputs:
basic_input:
type: uri_file
path: <path-to-data>
compute: azureml:cpu-cluster
jobs:
flow_from_registered:
type: parallel
component: azureml:my_flow_component:1
inputs:
data: ${{parent.inputs.basic_input}}
text: "${data.text}"
flow_from_dag:
type: parallel
component: <path-to-flow-dag-yaml>
inputs:
data: ${{parent.inputs.basic_input}}
text: "${data.text}"
flow_from_run:
type: parallel
component: <path-to-run-yaml>
inputs:
data: ${{parent.inputs.basic_input}}
text: "${data.text}"
...
管道作业可以通过az ml job create --file pipeline.yml
提交。
完整的示例可以在此处找到。
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient, load_component, Input
from azure.ai.ml.dsl import pipeline
credential = DefaultAzureCredential()
ml_client = MLClient.from_config(credential=credential)
data_input = Input(path="<path-to-data>", type='uri_file')
flow_component_from_registered = ml_client.components.get("my_flow_component", "1")
flow_component_from_dag = load_component("<path-to-flow-dag-yaml>")
flow_component_from_run = load_component("<path-to-run-yaml>")
@pipeline
def pipeline_func_with_flow(basic_input):
flow_from_registered = flow_component_from_registered(
data=data,
text="${data.text}",
)
flow_from_dag = flow_component_from_dag(
data=data,
text="${data.text}",
)
flow_from_run = flow_component_from_run(
data=data,
text="${data.text}",
)
pipeline_with_flow = pipeline_func_with_flow(basic_input=data_input)
pipeline_with_flow.compute = "cpu-cluster"
pipeline_job = ml_client.jobs.create_or_update(pipeline_with_flow)
ml_client.jobs.stream(pipeline_job.name)
完整的示例可以在此处找到。
与常规并行组件一样,客户可以在管道作业中为它们指定运行设置。一些常用运行设置已在组件端口和运行设置部分中列出;客户还可以参考并行组件的官方文档了解更多详细信息
...
jobs:
flow_node:
type: parallel
component: <path-to-complicated-run-yaml>
compute: azureml:cpu-cluster
instance_count: 2
max_concurrency_per_instance: 2
mini_batch_error_threshold: 5
retry_settings:
max_retries: 3
timeout: 30
inputs:
data: ${{parent.inputs.data}}
url: "${data.url}"
connections.summarize_text_content.connection: azure_open_ai_connection
connections.summarize_text_content.deployment_name: text-davinci-003
environment_variables.AZURE_OPENAI_API_KEY: ${my_connection.api_key}
environment_variables.AZURE_OPENAI_API_BASE: ${my_connection.api_base}
...
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient, load_component, Input
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import RetrySettings
credential = DefaultAzureCredential()
ml_client = MLClient.from_config(credential=credential)
data_input = Input(path="<path-to-data>", type='uri_file')
# Load flow as a component
flow_component = load_component("<path-to-complicated-run-yaml>")
@pipeline
def pipeline_func_with_flow(data):
flow_node = flow_component(
data=data,
url="${data.url}",
connections={
"summarize_text_content": {
"connection": "azure_open_ai_connection",
"deployment_name": "text-davinci-003",
},
},
environment_variables={
"AZURE_OPENAI_API_KEY": "${my_connection.api_key}",
"AZURE_OPENAI_API_BASE": "${my_connection.api_base}",
}
)
flow_node.compute = "cpu-cluster"
flow_node.instance_count = 2
flow_node.max_concurrency_per_instance = 2
flow_node.mini_batch_error_threshold = 5
flow_node.retry_settings = RetrySettings(timeout=30, max_retries=5)
pipeline_with_flow = pipeline_func_with_flow(data=data_input)
pipeline_job = ml_client.jobs.create_or_update(pipeline_with_flow)
ml_client.jobs.stream(pipeline_job.name)
组件的环境#
默认情况下,创建的组件的环境将基于最新的 promptflow 运行时映像。如果客户在flow.dag.yaml
中指定了 python 需求文件,它们将自动应用于环境
...
environment:
python_requirements_txt: requirements.txt
如果客户希望使用现有的 Azure ML 环境或以 Azure ML 样式定义环境,他们可以在run.yaml
中定义,如下所示
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Run.schema.json
flow: <my-flow-directory>
azureml:
environment: azureml:my-environment:1
有关 Azure ML 环境支持格式的更多详细信息,请参阅此文档。
Prompt flow 中的流与管道作业中的流之间的差异#
在 prompt flow 中,流在为 prompt flow 设计的计算会话上运行;而在管道作业中,流在不同类型的计算(通常是计算集群)上运行。
鉴于上述情况,如果您的流具有依赖于身份或环境变量的逻辑,请注意此差异,因为当流在管道作业中运行时,您可能会遇到一些意外错误,并且您可能需要一些额外的配置才能使其正常工作。
组件端口和运行设置#
输入端口#
键 |
源 |
类型 |
描述 |
---|---|---|---|
数据 |
固定 |
uri_folder 或 uri_file |
必需;用于传入输入数据。支持的格式包括 |
run_outputs |
固定 |
uri_folder |
可选;用于传入标准流的输出,用于评估流。应链接到管道中先前流节点的 |
输出端口#
键 |
源 |
类型 |
描述 |
---|---|---|---|
flow_outputs |
固定 |
uri_folder |
一个包含 1 个或多个 jsonl 文件(包含流运行的输出)的 uri_folder |
debug_info |
固定 |
uri_folder |
一个包含流运行调试信息(例如运行日志)的 uri_folder |
参数#
键 |
源 |
类型 |
描述 |
---|---|---|---|
来自流输入 |
字符串 |
默认值将从流输入继承;用于覆盖流输入的列映射。 |
|
connections. |
来自内置 LLM 工具的节点 |
字符串 |
默认值将是 |
connections. |
来自内置 LLM 工具的节点 |
字符串 |
默认值将是 |
connections. |
来自具有 |
字符串 |
默认值将是 |
environment_variables. |
来自 |
字符串 |
默认值将是 |
运行设置#
键 |
类型 |
描述 |
---|---|---|
instance_count |
整数 |
用于作业的节点数。默认值为 1。 |
max_concurrency_per_instance |
整数 |
每个节点上的处理器数量。 |
mini_batch_error_threshold |
整数 |
定义此并行作业中可以忽略的失败小批次的数量。如果失败小批次的计数高于此阈值,则并行作业将被标记为失败。 |
retry_settings.max_retries |
整数 |
定义当小批次失败或超时时重试的次数。如果所有重试都失败,小批次将被标记为失败,并由 |
retry_settings.timeout |
整数 |
定义执行自定义 run() 函数的超时时间(秒)。如果执行时间高于此阈值,小批次将被中止,并标记为失败小批次以触发重试。 |
logging_level |
INFO、WARNING 或 DEBUG |
定义哪些级别的日志将转储到用户日志文件中。 |
有关更多运行设置,请查看并行组件的官方文档。