基于类的流#
实验性功能
这是一个实验性功能,随时可能更改。了解更多。
当用户需要在多次流运行中将对象(如连接)持久化到内存中时,他们可以将可调用类编写为流入口,并将持久化参数放入 __init__
方法中。
如果用户需要在批处理运行输出上记录指标,他们可以添加一个 __aggregate__
方法,该方法将在批处理运行结束后调度。 __aggregate__
方法应仅包含 1 个参数,即批处理运行结果列表。
将类作为流#
假设我们有一个文件 flow_entry.py
class Reply(TypedDict):
output: str
class MyFlow:
def __init__(self, model_config: AzureOpenAIModelConfiguration, flow_config: dict):
"""Flow initialization logic goes here."""
self.model_config = model_config
self.flow_config = flow_config
def __call__(question: str) -> Reply:
"""Flow execution logic goes here."""
return Reply(output=output)
def __aggregate__(self, line_results: List[str]) -> dict:
"""Aggregation logic goes here. Return key-value pair as metrics."""
return {"key": val}
流测试#
使用原始代码测试#
由于流的定义是函数/可调用类。我们建议用户像运行其他脚本一样直接运行它
class MyFlow:
pass
if __name__ == "__main__":
flow = MyFlow(model_config, flow_config)
output = flow(question)
metrics = flow.__aggregate__([output])
# check metrics here
通过函数调用测试#
它还支持将您的类入口转换为流,并使用 prompt flow 的能力进行测试。
您可以使用以下 CLI 进行测试
# flow entry syntax: path.to.module:ClassName
pf flow test --flow flow_entry:MyFlow --inputs question="What's the capital of France?" --init init.json
注意:目前此命令将在您的工作目录中生成一个 flow.flex.yaml。它将成为流的入口。
在此处查看完整示例:basic-chat
与流聊天#
支持在 CLI 中与流聊天
pf flow test --flow flow_entry:MyFlow --inputs inputs.json --init init.json --ui
有关更多信息,请查看此处。
批处理运行#
用户还可以批处理运行流。
pf run create --flow "path.to.module:ClassName" --data "./data.jsonl"
# user can also directly use entry in `flow` param for batch run
pf.run(flow="path.to.module:ClassName", init="./init.jsonl", data="./data.jsonl")
或者直接运行导入的流类或流实例。
from promptflow.core import AzureOpenAIModelConfiguration
class MyFlow:
pass
config = AzureOpenAIModelConfiguration(
azure_deployment="my_deployment",
# connection and api_key configs are exclusive
connection="my_aoai_connection",
api_key="actual_key",
)
pf.run(flow=MyFlow, init={"model_config": config, "flow_config": {}}, data="./data.jsonl")
# or
flow_obj = MyFlow(model_config=config, flow_config={})
pf.run(flow=flow_obj, data="./data.jsonl")
有关此主题的更多信息,请参阅运行和评估流
定义流 YAML#
用户可以手动编写名为 flow.flex.yaml
的 YAML 文件,或将函数/可调用入口保存到 YAML 文件中。这是部署或在云中运行等高级场景所必需的。流 YAML 可能如下所示
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
entry: path.to.module:ClassName
使用 YAML 批处理运行#
用户可以批处理运行流。流 init 函数的参数由 init
参数支持。
用户需要编写一个 JSON 文件作为 init 的值,因为在命令行中编写模型配置很困难。
{
"model_config": {
"azure_endpoint": "my_endpoint",
"azure_deployment": "my_deployment",
"api_key": "actual_api_key"
},
"flow_config": {}
}
pf run create --flow "./flow.flex.yaml" --data "./data.jsonl" --init init.json
pf = PFClient()
config = AzureOpenAIModelConfiguration(
azure_deployment="my_deployment",
api_key="actual_key"
)
# if init's value is not json serializable, raise user error
pf.run(flow="./flow.flex.yaml", init={"model_config": config, "flow_config": {}}, data="./data.jsonl")
# when submit to cloud, user can only use connection
# in runtime executor will resolve connection in AzureOpenAIModelConfiguration and set connection's fields to ModelConfig: equal to original ModelConfiguration.from_connection()
config = AzureOpenAIModelConfiguration(
azure_deployment="my_embedding_deployment",
connection="my-aoai-connection",
)
pfazure.run(flow="./flow.flex.yaml", init={"model_config": config, "flow_config": {}}, data="./data.jsonl")
部署流#
用户可以服务流。流 init 函数的参数由 init
参数支持。流应在 YAML 中具有完整的 init/inputs/outputs 规范,以确保可以生成服务 swagger。
用户需要编写一个 JSON 文件作为 init 的值,因为在命令行中编写模型配置很困难。
{
"model_config": {
"azure_endpoint": "my_endpoint",
"azure_deployment": "my_deployment",
"api_key": "actual_api_key"
},
"flow_config": {}
}
# user can only pass model config by file
pf flow serve --source "./" --port 8088 --host localhost --init path/to/init.json
了解更多:部署流。
聚合支持#
引入聚合支持是为了帮助用户计算指标。
class MyFlow:
def __call__(text: str) -> str:
"""Flow execution logic goes here."""
pass
# will only execute once after batch run finished.
# the processed_results will be list of __call__'s output and we will log the return value as metrics automatically.
def __aggregate__(self, processed_results: List[str]) -> dict:
for element in processed_results:
# If __call__'s output is primitive type, element will be primitive type.
# If __call__'s output is dataclass, element will be a dictionary, but can access it's attribute with `element.attribute_name`
# For other cases, it's recommended to access by key `element["attribute_name"]`
注意:
聚合支持有一些限制
聚合函数只会在批处理运行中执行。
仅支持 1 个硬编码的
__aggregate__
函数。在执行时,
__aggregate__
将只传递 1 个位置参数。聚合函数的输入将是流运行的输出列表。
传递给
__aggregate__
函数的processed_results
中的每个元素与每行的__call__
返回的对象不相同。重构的元素是一个字典,它支持 1 层属性访问。但建议通过键访问它们。有关用法,请参阅上面的示例。
如果聚合函数接受多个参数,则在提交阶段引发错误。